This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new a793963543 [INLONG-10247][Manager] Support schedule information
management for offline sync (#10254)
a793963543 is described below
commit a79396354389699877919625932f619bfb95a4c5
Author: AloysZhang <[email protected]>
AuthorDate: Thu May 23 18:18:31 2024 +0800
[INLONG-10247][Manager] Support schedule information management for
offline sync (#10254)
---
.../client/api/inner/client/ClientFactory.java | 2 +
.../api/inner/client/InLongScheduleClient.java | 67 +++++++++
.../client/api/service/InLongScheduleApi.java | 49 +++++++
.../inlong/manager/common/enums/ErrorCodeEnum.java | 3 +
.../manager/common/enums/OperationTarget.java | 4 +-
.../{OperationTarget.java => ScheduleStatus.java} | 52 ++++---
.../inlong/manager/dao/entity/ScheduleEntity.java | 61 ++++++++
.../manager/dao/mapper/ScheduleEntityMapper.java} | 34 ++---
.../resources/mappers/ScheduleEntityMapper.xml | 144 +++++++++++++++++++
.../manager/dao/mapper/ScheduleEntityTest.java | 118 ++++++++++++++++
.../inlong/manager/pojo/schedule/ScheduleInfo.java | 82 +++++++++++
.../manager/pojo/schedule/ScheduleInfoRequest.java | 76 ++++++++++
.../manager/service/schedule/ScheduleService.java | 71 ++++++++++
.../service/schedule/ScheduleServiceImpl.java | 155 +++++++++++++++++++++
.../main/resources/h2/apache_inlong_manager.sql | 31 +++++
.../manager-web/sql/apache_inlong_manager.sql | 30 ++++
inlong-manager/manager-web/sql/changes-1.13.0.sql | 30 ++++
.../web/controller/InLongSchedulerController.java | 88 ++++++++++++
18 files changed, 1045 insertions(+), 52 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
index 3c67f1fa71..01c9fb6473 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -55,6 +55,7 @@ public class ClientFactory {
private final AuditClient auditClient;
private final InlongTenantClient inlongTenantClient;
private final InlongTenantRoleClient inlongTenantRoleClient;
+ private final InLongScheduleClient inLongScheduleClient;
public ClientFactory(ClientConfiguration configuration) {
groupClient = new InlongGroupClient(configuration);
@@ -74,5 +75,6 @@ public class ClientFactory {
auditClient = new AuditClient(configuration);
inlongTenantClient = new InlongTenantClient(configuration);
inlongTenantRoleClient = new InlongTenantRoleClient(configuration);
+ inLongScheduleClient = new InLongScheduleClient(configuration);
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
new file mode 100644
index 0000000000..86638dbae8
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InLongScheduleClient.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InLongScheduleApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+public class InLongScheduleClient {
+
+ private InLongScheduleApi scheduleApi;
+
+ public InLongScheduleClient(ClientConfiguration clientConfiguration) {
+ scheduleApi =
ClientUtils.createRetrofit(clientConfiguration).create(InLongScheduleApi.class);
+ }
+
+ public Integer createScheduleInfo(ScheduleInfoRequest request) {
+ Response<Integer> response =
ClientUtils.executeHttpCall(scheduleApi.createSchedule(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public Boolean scheduleInfoExist(String groupId) {
+ Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Response<Boolean> response =
ClientUtils.executeHttpCall(scheduleApi.exist(groupId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public Boolean updateScheduleInfo(ScheduleInfoRequest request) {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(scheduleApi.update(request));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public ScheduleInfo getScheduleInfo(String groupId) {
+ Response<ScheduleInfo> response =
ClientUtils.executeHttpCall(scheduleApi.get(groupId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
+ public Boolean deleteScheduleInfo(String groupId) {
+ Response<Boolean> response =
ClientUtils.executeHttpCall(scheduleApi.delete(groupId));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
new file mode 100644
index 0000000000..0da7be3d6f
--- /dev/null
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InLongScheduleApi.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.DELETE;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+import retrofit2.http.Path;
+import retrofit2.http.Query;
+
+public interface InLongScheduleApi {
+
+ @POST("schedule/save")
+ Call<Response<Integer>> createSchedule(@Body ScheduleInfoRequest request);
+
+ @GET("schedule/exist/{groupId}")
+ Call<Response<Boolean>> exist(@Path("groupId") String groupId);
+
+ @POST("schedule/update")
+ Call<Response<Boolean>> update(@Body ScheduleInfoRequest request);
+
+ @GET("schedule/get")
+ Call<Response<ScheduleInfo>> get(@Query("groupId") String groupId);
+
+ @DELETE("schedule/delete/{groupId}")
+ Call<Response<Boolean>> delete(@Path("groupId") String groupId);
+
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 98a7f50bd7..ce3bf9d015 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -125,6 +125,9 @@ public enum ErrorCodeEnum {
MQ_TYPE_IS_NULL(1600, "MQ type is null"),
MQ_TYPE_NOT_SUPPORT(1601, "MQ type '%s' not support"),
+ SCHEDULE_NOT_FOUND(1700, "Schedule info not found"),
+ SCHEDULE_DUPLICATE(1701, "Schedule info already exist"),
+
WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no
operation authority"),
WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
index e6c866a0e1..93d4e1f92c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
@@ -44,6 +44,8 @@ public enum OperationTarget {
INLONG_ROLE,
- TENANT_ROLE
+ TENANT_ROLE,
+
+ SCHEDULE
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
similarity index 58%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
copy to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
index e6c866a0e1..cb256a491c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java
@@ -17,33 +17,29 @@
package org.apache.inlong.manager.common.enums;
-/**
- * Operation target
- */
-public enum OperationTarget {
-
- TENANT,
-
- GROUP,
-
- STREAM,
-
- SOURCE,
-
- SINK,
-
- CONSUME,
-
- WORKFLOW,
-
- NODE,
-
- CLUSTER,
-
- TRANSFORM,
-
- INLONG_ROLE,
-
- TENANT_ROLE
+import lombok.Getter;
+
+@Getter
+public enum ScheduleStatus {
+
+ NEW(100, "new"),
+ DELETED(40, "deleted");
+
+ private final Integer code;
+ private final String description;
+
+ ScheduleStatus(Integer code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ public static ScheduleStatus forCode(int code) {
+ for (ScheduleStatus status : values()) {
+ if (status.getCode() == code) {
+ return status;
+ }
+ }
+ throw new IllegalStateException(String.format("Illegal code=%s for
ScheduleStatus", code));
+ }
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
new file mode 100644
index 0000000000..75237343cb
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.Date;
+
+@Data
+public class ScheduleEntity implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private Integer id;
+ // inLong group id
+ private String inlongGroupId;
+ // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+ private Integer scheduleType;
+ // time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
+ // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ private String scheduleUnit;
+ private Integer scheduleInterval;
+ // schedule start time, long type timestamp
+ private Timestamp startTime;
+ // schedule end time, long type timestamp
+ private Timestamp endTime;
+ // delay time to start task, in minutes
+ private Integer delayTime;
+ // if task depend on itself
+ private Integer selfDepend;
+ private Integer taskParallelism;
+ // expression of crontab, used when scheduleType is crontab
+ private String crontabExpression;
+
+ private Integer status;
+ private Integer previousStatus;
+ private Integer isDeleted;
+ private String creator;
+ private String modifier;
+ private Date createTime;
+ private Date modifyTime;
+ private Integer version;
+
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
similarity index 60%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
copy to
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
index e6c866a0e1..23e8e23c31 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationTarget.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityMapper.java
@@ -15,35 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.dao.mapper;
-/**
- * Operation target
- */
-public enum OperationTarget {
-
- TENANT,
-
- GROUP,
-
- STREAM,
-
- SOURCE,
-
- SINK,
-
- CONSUME,
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
- WORKFLOW,
+import org.apache.ibatis.annotations.Param;
+import org.springframework.stereotype.Repository;
- NODE,
+@Repository
+public interface ScheduleEntityMapper {
- CLUSTER,
+ int insert(ScheduleEntity scheduleEntity);
- TRANSFORM,
+ ScheduleEntity selectByPrimaryKey(Integer id);
- INLONG_ROLE,
+ ScheduleEntity selectByGroupId(String groupId);
- TENANT_ROLE
+ int updateByIdSelective(ScheduleEntity scheduleEntity);
+ int deleteByGroupId(@Param("inlongGroupId") String inlongGroupId);
}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
new file mode 100644
index 0000000000..d719aa8988
--- /dev/null
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper">
+ <resultMap id="BaseResultMap"
type="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR"
property="inlongGroupId"/>
+ <result column="schedule_type" jdbcType="INTEGER"
property="scheduleType"/>
+ <result column="schedule_unit" jdbcType="VARCHAR"
property="scheduleUnit"/>
+ <result column="schedule_interval" jdbcType="INTEGER"
property="scheduleInterval"/>
+ <result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
+ <result column="end_time" jdbcType="TIMESTAMP" property="endTime"/>
+ <result column="delay_time" jdbcType="INTEGER" property="delayTime"/>
+ <result column="self_depend" jdbcType="INTEGER" property="selfDepend"/>
+ <result column="task_parallelism" jdbcType="INTEGER"
property="taskParallelism"/>
+ <result column="crontab_expression" jdbcType="VARCHAR"
property="crontabExpression"/>
+
+ <result column="status" jdbcType="INTEGER" property="status"/>
+ <result column="previous_status" jdbcType="INTEGER"
property="previousStatus"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
+ <result column="version" jdbcType="INTEGER" property="version"/>
+ </resultMap>
+
+ <sql id="Base_Column_List">
+ id, inlong_group_id, schedule_type, schedule_unit, schedule_interval,
start_time,
+ end_time, delay_time, self_depend, task_parallelism,
crontab_expression,
+ status, previous_status, is_deleted, creator, modifier,
create_time, modify_time, version
+ </sql>
+
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+ insert into schedule_config (id, inlong_group_id, schedule_type,
schedule_unit,
+ schedule_interval, start_time, end_time,
delay_time,
+ self_depend, task_parallelism,
crontab_expression,
+ status, previous_status, creator,
modifier)
+ values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR},
+ #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit,
jdbcType=VARCHAR},
+ #{scheduleInterval, jdbcType=INTEGER}, #{startTime,
jdbcType=TIMESTAMP},
+ #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER},
+ #{selfDepend, jdbcType=INTEGER}, #{taskParallelism,
jdbcType=INTEGER},
+ #{crontabExpression, jdbcType=VARCHAR},
#{status,jdbcType=INTEGER},
+ #{previousStatus,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR},
+ #{modifier,jdbcType=VARCHAR})
+ </insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from schedule_config
+ where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
+ </select>
+
+ <select id="selectByGroupId" parameterType="java.lang.String"
resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from schedule_config
+ where inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
+
+ <update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
+ update schedule_config
+ <set>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+ </if>
+ <if test="scheduleType != null">
+ schedule_type = #{scheduleType, jdbcType=INTEGER},
+ </if>
+ <if test="scheduleUnit !=null">
+ schedule_unit = #{scheduleUnit, jdbcType=VARCHAR},
+ </if>
+ <if test="scheduleInterval != null">
+ schedule_interval = #{scheduleInterval, jdbcType=INTEGER},
+ </if>
+ <if test="startTime != null">
+ start_time = #{startTime, jdbcType=TIMESTAMP},
+ </if>
+ <if test="endTime != null">
+ end_time = #{endTime, jdbcType=TIMESTAMP},
+ </if>
+ <if test="delayTime != null">
+ delay_time = #{delayTime, jdbcType=INTEGER},
+ </if>
+ <if test="selfDepend != null">
+ self_depend = #{selfDepend, jdbcType=INTEGER},
+ </if>
+ <if test="taskParallelism != null">
+ task_parallelism = #{taskParallelism, jdbcType=INTEGER},
+ </if>
+ <if test="crontabExpression != null">
+ crontab_expression = #{crontabExpression, jdbcType=VARCHAR},
+ </if>
+ <if test="status != null">
+ status = #{status, jdbcType=VARCHAR},
+ </if>
+ <if test="previousStatus != null">
+ previous_status = #{previousStatus, jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted},
+ </if>
+ <if test="creator != null">
+ creator = #{creator, jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier, jdbcType=VARCHAR},
+ </if>
+ version = #{version, jdbcType=INTEGER} + 1
+ </set>
+ <where>
+ id = #{id, jdbcType=INTEGER}
+ and version = #{version,jdbcType=INTEGER}
+ </where>
+ </update>
+
+ <delete id="deleteByGroupId">
+ delete
+ from schedule_config
+ where inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </delete>
+
+</mapper>
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
new file mode 100644
index 0000000000..ef4207bc84
--- /dev/null
+++
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import org.apache.inlong.manager.dao.DaoBaseTest;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Timestamp;
+import java.util.Date;
+
+public class ScheduleEntityTest extends DaoBaseTest {
+
+ public static final String GROUP_ID_PREFIX = "test_group_";
+ public static final String USER = "admin";
+ public static final int SCHEDULE_TYPE = 0;
+ public static final int SCHEDULE_TYPE_NEW = 1;
+ public static final String SCHEDULE_UNIT = "H";
+ public static final String SCHEDULE_UNIT_NEW = "D";
+ public static final int SCHEDULE_INTERVAL = 1;
+ public static final int SCHEDULE_INTERVAL_NEW = 1;
+ public static final Timestamp DEFAULT_TIME = new
Timestamp(System.currentTimeMillis());
+
+ @Autowired
+ ScheduleEntityMapper scheduleEntityMapper;
+
+ @Test
+ public void testSelectByGroupId() throws Exception {
+ ScheduleEntity scheduleEntity = genEntity();
+ scheduleEntityMapper.insert(scheduleEntity);
+ ScheduleEntity entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+ Assertions.assertEquals(scheduleEntity.getInlongGroupId(),
entityQueried.getInlongGroupId());
+ Assertions.assertEquals(SCHEDULE_TYPE,
entityQueried.getScheduleType());
+ Assertions.assertEquals(SCHEDULE_UNIT,
entityQueried.getScheduleUnit());
+ Assertions.assertEquals(SCHEDULE_INTERVAL,
entityQueried.getScheduleInterval());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+ Assertions.assertEquals(USER, entityQueried.getCreator());
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ ScheduleEntity scheduleEntity = genEntity();
+ scheduleEntityMapper.insert(scheduleEntity);
+ ScheduleEntity entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+ Assertions.assertEquals(scheduleEntity.getInlongGroupId(),
entityQueried.getInlongGroupId());
+ Assertions.assertEquals(SCHEDULE_TYPE,
entityQueried.getScheduleType());
+ Assertions.assertEquals(SCHEDULE_UNIT,
entityQueried.getScheduleUnit());
+ Assertions.assertEquals(SCHEDULE_INTERVAL,
entityQueried.getScheduleInterval());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+ Assertions.assertEquals(USER, entityQueried.getCreator());
+
+ entityQueried.setScheduleType(SCHEDULE_TYPE_NEW);
+ entityQueried.setScheduleUnit(SCHEDULE_UNIT_NEW);
+ entityQueried.setScheduleInterval(SCHEDULE_INTERVAL_NEW);
+ scheduleEntityMapper.updateByIdSelective(entityQueried);
+ entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+ Assertions.assertEquals(scheduleEntity.getInlongGroupId(),
entityQueried.getInlongGroupId());
+ Assertions.assertEquals(SCHEDULE_TYPE_NEW,
entityQueried.getScheduleType());
+ Assertions.assertEquals(SCHEDULE_UNIT_NEW,
entityQueried.getScheduleUnit());
+ Assertions.assertEquals(SCHEDULE_INTERVAL_NEW,
entityQueried.getScheduleInterval());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+ Assertions.assertEquals(USER, entityQueried.getCreator());
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ ScheduleEntity scheduleEntity = genEntity();
+ scheduleEntityMapper.insert(scheduleEntity);
+ ScheduleEntity entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+ Assertions.assertEquals(scheduleEntity.getInlongGroupId(),
entityQueried.getInlongGroupId());
+ Assertions.assertEquals(SCHEDULE_TYPE,
entityQueried.getScheduleType());
+ Assertions.assertEquals(SCHEDULE_UNIT,
entityQueried.getScheduleUnit());
+ Assertions.assertEquals(SCHEDULE_INTERVAL,
entityQueried.getScheduleInterval());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
+ Assertions.assertEquals(DEFAULT_TIME, entityQueried.getEndTime());
+ Assertions.assertEquals(USER, entityQueried.getCreator());
+
+
scheduleEntityMapper.deleteByGroupId(scheduleEntity.getInlongGroupId());
+ entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
+ Assertions.assertNull(entityQueried);
+ }
+
+ private ScheduleEntity genEntity() {
+ ScheduleEntity entity = new ScheduleEntity();
+ entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis());
+ entity.setScheduleType(SCHEDULE_TYPE);
+ entity.setScheduleUnit(SCHEDULE_UNIT);
+ entity.setScheduleInterval(SCHEDULE_INTERVAL);
+ entity.setStartTime(DEFAULT_TIME);
+ entity.setEndTime(DEFAULT_TIME);
+ entity.setCreator(USER);
+ entity.setCreateTime(new Date());
+ entity.setModifyTime(new Date());
+ entity.setIsDeleted(0);
+ return entity;
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
new file mode 100644
index 0000000000..13afce70a4
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule;
+
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+
+import java.sql.Timestamp;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Schedule response")
+public class ScheduleInfo {
+
+ @ApiModelProperty(value = "Primary key")
+ @NotNull(groups = UpdateValidation.class)
+ private Integer id;
+
+ @ApiModelProperty("Inlong Group ID")
+ @NotNull
+ private String inlongGroupId;
+
+ // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+ @ApiModelProperty("Schedule type")
+ private Integer scheduleType;
+
+ // time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
+ // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ @ApiModelProperty("TimeUnit for schedule interval")
+ private String scheduleUnit;
+
+ @ApiModelProperty("Schedule interval")
+ private Integer scheduleInterval;
+
+ @ApiModelProperty("Start time")
+ private Timestamp startTime;
+
+ @ApiModelProperty("End time")
+ private Timestamp endTime;
+
+ @ApiModelProperty("Delay time")
+ private Integer delayTime;
+
+ @ApiModelProperty("Self depend")
+ private Integer selfDepend;
+
+ @ApiModelProperty("Schedule task parallelism")
+ private Integer taskParallelism;
+
+ @ApiModelProperty("Schedule task parallelism")
+ private Integer crontabExpression;
+
+ @ApiModelProperty(value = "Version number")
+ @NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
+ private Integer version;
+
+}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
new file mode 100644
index 0000000000..eff1660719
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule;
+
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+import java.sql.Timestamp;
+
+@Data
+@ApiModel("Schedule request")
+public class ScheduleInfoRequest {
+
+ @ApiModelProperty(value = "Primary key")
+ @NotNull(groups = UpdateValidation.class)
+ private Integer id;
+
+ @ApiModelProperty("Inlong Group ID")
+ @NotNull
+ private String inlongGroupId;
+
+ // schedule type, support [normal, crontab], 0 for normal and 1 for crontab
+ @ApiModelProperty("Schedule type")
+ private Integer scheduleType;
+
+ // time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneway]
+ // M=month, W=week, D=day, H=hour, M=minute, O=oneway
+ @ApiModelProperty("TimeUnit for schedule interval")
+ private String scheduleUnit;
+
+ @ApiModelProperty("Schedule interval")
+ private Integer scheduleInterval;
+
+ @ApiModelProperty("Start time")
+ private Timestamp startTime;
+
+ @ApiModelProperty("End time")
+ private Timestamp endTime;
+
+ @ApiModelProperty("Delay time")
+ private Integer delayTime;
+
+ @ApiModelProperty("Self depend")
+ private Integer selfDepend;
+
+ @ApiModelProperty("Schedule task parallelism")
+ private Integer taskParallelism;
+
+ @ApiModelProperty("Schedule task parallelism")
+ private Integer crontabExpression;
+
+ @ApiModelProperty(value = "Version number")
+ @NotNull(groups = UpdateValidation.class, message = "version cannot be
null")
+ private Integer version;
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
new file mode 100644
index 0000000000..d00e7134d4
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+
+public interface ScheduleService {
+
+ /**
+ * Save schedule info.
+ *
+ * @param request schedule request need to save
+ * @param operator name of operator
+ * @return schedule info id in backend storage
+ */
+ int save(@Valid @NotNull(message = "schedule request cannot be null")
ScheduleInfoRequest request,
+ String operator);
+
+ /**
+ * Query whether schedule info exists for specified inlong group
+ *
+ * @param groupId the group id to be queried
+ * @return does it exist
+ */
+ Boolean exist(String groupId);
+
+ /**
+ * Get schedule info based on inlong group id
+ *
+ * @param groupId inlong group id
+ * @return detail of inlong group
+ */
+ ScheduleInfo get(String groupId);
+
+ /**
+ * Modify schedule information
+ *
+ * @param request schedule request that needs to be modified
+ * @param operator name of operator
+ * @return whether succeed
+ */
+ Boolean update(@Valid @NotNull(message = "schedule request cannot be
null") ScheduleInfoRequest request,
+ String operator);
+
+ /**
+ * Delete schedule info for gropuId.
+ * @param groupId groupId to find a schedule info to delete
+ * @param operator name of operator
+ * @Return whether succeed
+ * */
+ Boolean deleteByGroupId(String groupId, String operator);
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
new file mode 100644
index 0000000000..480189da9e
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.ScheduleStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.ScheduleEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Objects;
+
+@Service
+public class ScheduleServiceImpl implements ScheduleService {
+
+ private static Logger LOGGER =
LoggerFactory.getLogger(ScheduleServiceImpl.class);
+
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private ScheduleEntityMapper scheduleEntityMapper;
+
+ @Override
+ public int save(ScheduleInfoRequest request, String operator) {
+ LOGGER.debug("begin to save schedule info, scheduleInfo={},
operator={}", request, operator);
+
+ String groupId = request.getInlongGroupId();
+ checkGroupExist(groupId);
+ if (scheduleEntityMapper.selectByGroupId(groupId) != null) {
+ LOGGER.error("schedule info for group={} already exists", groupId);
+ throw new BusinessException(ErrorCodeEnum.SCHEDULE_DUPLICATE);
+ }
+
+ ScheduleEntity scheduleEntity =
CommonBeanUtils.copyProperties(request, ScheduleEntity::new);
+ scheduleEntity.setStatus(ScheduleStatus.NEW.getCode());
+ scheduleEntity.setCreator(operator);
+ scheduleEntity.setModifier(operator);
+ return scheduleEntityMapper.insert(scheduleEntity);
+ }
+
+ @Override
+ public Boolean exist(String groupId) {
+ checkGroupExist(groupId);
+ return scheduleEntityMapper.selectByGroupId(groupId) != null;
+ }
+
+ @Override
+ public ScheduleInfo get(String groupId) {
+ LOGGER.debug("begin to get schedule info by groupId={}", groupId);
+ ScheduleEntity entity = getScheduleEntity(groupId);
+ return CommonBeanUtils.copyProperties(entity, ScheduleInfo::new);
+ }
+
+ @Override
+ public Boolean update(ScheduleInfoRequest request, String operator) {
+ LOGGER.debug("begin to update schedule info={}", request);
+ String groupId = request.getInlongGroupId();
+ ScheduleEntity entity = getScheduleEntity(groupId);
+ String errMsg =
+ String.format("schedule info has already been updated with
groupId=%s, curVersion=%s, expectVersion=%s",
+ entity.getInlongGroupId(), request.getVersion(),
entity.getVersion());
+ if (!Objects.equals(entity.getVersion(), request.getVersion())) {
+ LOGGER.error(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ CommonBeanUtils.copyProperties(request, entity, true);
+ entity.setModifier(operator);
+ updateScheduleInfo(entity, errMsg);
+ LOGGER.info("success to update schedule info for groupId={}", groupId);
+ return true;
+ }
+
+ @Override
+ public Boolean deleteByGroupId(String groupId, String operator) {
+ LOGGER.debug("begin to delete schedule info for groupId={}", groupId);
+ checkGroupExist(groupId);
+ ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId);
+ if (entity == null) {
+ LOGGER.error("schedule info for groupId={} does not exist",
groupId);
+ return false;
+ }
+ entity.setPreviousStatus(entity.getStatus());
+ entity.setStatus(ScheduleStatus.DELETED.getCode());
+ entity.setModifier(operator);
+ entity.setIsDeleted(entity.getId());
+ updateScheduleInfo(entity,
+ String.format("schedule info has already been updated with
groupId=%s, curVersion=%s",
+ entity.getInlongGroupId(), entity.getVersion()));
+ LOGGER.info("success to delete schedule info for groupId={}", groupId);
+ return true;
+ }
+
+ /**
+ * Check whether InLongGroup exists, throw BusinessException with
ErrorCodeEnum.GROUP_NOT_FOUND if check failed.
+ * */
+ private void checkGroupExist(String groupId) {
+ Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+ if (entity == null) {
+ LOGGER.error("inlong group not found by groupId={}", groupId);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+ }
+
+ private ScheduleEntity getScheduleEntity(String groupId) {
+ checkGroupExist(groupId);
+ ScheduleEntity entity = scheduleEntityMapper.selectByGroupId(groupId);
+ if (entity == null) {
+ LOGGER.error("schedule info for group={} not found", groupId);
+ throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND);
+ }
+ return entity;
+ }
+
+ /**
+ * Update schedule entity and throw exception if update failed.
+ * @param entity to update
+ * @param errorMsg when update failed.
+ * @return
+ *
+ * */
+ private void updateScheduleInfo(ScheduleEntity entity, String errorMsg) {
+ if (scheduleEntityMapper.updateByIdSelective(entity) !=
InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error(errorMsg);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ }
+}
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 8f20196352..f5228cea4d 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -957,4 +957,35 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
-- ----------------------------
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
+ `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
+ `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
+ `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
+ `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in
minutes to schedule',
+ `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend
info',
+ `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task
parallelism',
+ `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab
expression if schedule type is crontab',
+ `status` int(4) DEFAULT '100' COMMENT 'Schedule
status',
+ `previous_status` int(4) DEFAULT '100' COMMENT 'Previous
schedule status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT
'Version number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+ ) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
+
+
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 41e55c5206..e3c9ec8eba 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1021,4 +1021,34 @@ CREATE TABLE IF NOT EXISTS `cluster_config`
DEFAULT CHARSET = utf8mb4 COMMENT = 'cluster_config';
-- ----------------------------
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
+ `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
+ `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
+ `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
+ `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in
minutes to schedule',
+ `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend
info',
+ `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task
parallelism',
+ `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab
expression if schedule type is crontab',
+ `status` int(4) DEFAULT '100' COMMENT 'Schedule
status',
+ `previous_status` int(4) DEFAULT '100' COMMENT 'Previous
schedule status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT
'Version number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
+
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 7013c2990f..4734495338 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -26,3 +26,33 @@ USE `apache_inlong_manager`;
ALTER TABLE `inlong_cluster_node` ADD COLUMN `username` varchar(256) DEFAULT
NULL COMMENT 'username for ssh';
ALTER TABLE `inlong_cluster_node` ADD COLUMN `password` varchar(256) DEFAULT
NULL COMMENT 'password for ssh';
ALTER TABLE `inlong_cluster_node` ADD COLUMN `ssh_port` int(11) DEFAULT NULL
COMMENT 'ssh port';
+
+-- ----------------------------
+-- Table structure for schedule_config
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `schedule_config`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong
group id, undeleted ones cannot be repeated',
+ `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
+ `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule
unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway',
+ `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
+ `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
+ `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'End time for schedule',
+ `delay_time` int(11) DEFAULT '0' COMMENT 'Delay time in
minutes to schedule',
+ `self_depend` int(11) DEFAULT NULL COMMENT 'Self depend
info',
+ `task_parallelism` int(11) DEFAULT NULL COMMENT 'Task
parallelism',
+ `crontab_expression` varchar(256) DEFAULT NULL COMMENT 'Crontab
expression if schedule type is crontab',
+ `status` int(4) DEFAULT '100' COMMENT 'Schedule
status',
+ `previous_status` int(4) DEFAULT '100' COMMENT 'Previous
schedule status',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to
delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT
'Version number, which will be incremented by 1 after modification',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_group_schedule_config` (`inlong_group_id`, `is_deleted`)
+ ) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT = 'schedule_config';
+-- ----------------------------
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
new file mode 100644
index 0000000000..801cc09b0e
--- /dev/null
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import org.apache.inlong.manager.common.enums.OperationTarget;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.user.LoginUserUtils;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.schedule.ScheduleService;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Schedule-API")
+public class InLongSchedulerController {
+
+ @Autowired
+ private ScheduleService scheduleService;
+
+ @RequestMapping(value = "/schedule/save", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SCHEDULE)
+ @ApiOperation(value = "Save schedule info")
+ public Response<Integer> save(@RequestBody ScheduleInfoRequest request) {
+ int result = scheduleService.save(request,
LoginUserUtils.getLoginUser().getName());
+ return Response.success(result);
+ }
+
+ @RequestMapping(value = "/schedule/exist/{groupId}", method =
RequestMethod.GET)
+ @ApiOperation(value = "Is the schedule info exists for inlong group")
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required
= true)
+ public Response<Boolean> exist(@PathVariable String groupId) {
+ return Response.success(scheduleService.exist(groupId));
+ }
+
+ @RequestMapping(value = "/schedule/get", method = RequestMethod.GET)
+ @ApiOperation(value = "Get schedule info for inlong group")
+ @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required
= true)
+ public Response<ScheduleInfo> get(@RequestParam String groupId) {
+ return Response.success(scheduleService.get(groupId));
+ }
+
+ @RequestMapping(value = "/schedule/update", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.SCHEDULE)
+ @ApiOperation(value = "Update schedule info")
+ public Response<Boolean> update(@Validated(UpdateValidation.class)
@RequestBody ScheduleInfoRequest request) {
+ return Response.success(scheduleService.update(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
+ @RequestMapping(value = "/schedule/delete/{groupId}", method =
RequestMethod.DELETE)
+ @ApiOperation(value = "Delete schedule info")
+ @OperationLog(operation = OperationType.DELETE, operationTarget =
OperationTarget.SCHEDULE)
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class, required = true)
+ public Response<Boolean> delete(@PathVariable String groupId) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(scheduleService.deleteByGroupId(groupId,
operator));
+ }
+
+}