This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit e3beb8c8456e9ad5f9c6d4918c0ecd6d18623d3d Author: AloysZhang <[email protected]> AuthorDate: Wed Jun 19 15:41:34 2024 +0800 [INLONG-10360][Manager] Combine schedule state transition with group operations (#10445) Co-authored-by: fuweng11 <[email protected]> --- .../inlong/manager/common/enums/ErrorCodeEnum.java | 2 + .../manager/common/enums/ScheduleStatus.java | 16 +- .../inlong/manager/pojo/group/InlongGroupInfo.java | 31 ++++ .../manager/pojo/group/InlongGroupRequest.java | 31 ++++ .../inlong/manager/pojo/schedule/ScheduleInfo.java | 29 ++++ .../manager/pojo/schedule/ScheduleInfoRequest.java | 28 ++++ ...leEngineClient.java => NoopScheduleClient.java} | 45 +++--- .../manager/schedule/ScheduleClientFactory.java | 55 +++++++ .../inlong/manager/schedule/ScheduleEngine.java | 4 +- .../manager/schedule/ScheduleEngineClient.java | 12 +- .../manager/schedule/ScheduleEngineType.java} | 28 +--- .../schedule/quartz/QuartzScheduleClient.java | 17 +- .../schedule/quartz/QuartzScheduleEngine.java | 21 +-- .../manager/schedule/util/ScheduleUtils.java | 4 +- .../inlong/manager/schedule/quartz/MockJob.java | 3 +- .../schedule/quartz/QuartzScheduleEngineTest.java | 2 +- .../manager/schedule/util/ScheduleUtilsTest.java | 8 +- inlong-manager/manager-service/pom.xml | 5 + .../service/group/InlongGroupServiceImpl.java | 57 ++++++- .../schedule/GroupScheduleResourceListener.java | 8 +- .../manager/service/schedule/ScheduleOperator.java | 95 +++++++++++ .../service/schedule/ScheduleOperatorImpl.java | 175 +++++++++++++++++++++ .../manager/service/schedule/ScheduleService.java | 11 ++ .../service/schedule/ScheduleServiceImpl.java | 54 ++++++- .../web/controller/InLongSchedulerController.java | 24 ++- .../src/main/resources/application-dev.properties | 6 +- .../src/main/resources/application-prod.properties | 3 + .../src/main/resources/application-test.properties | 3 + .../src/main/resources/application.properties | 4 + 29 files changed, 698 insertions(+), 83 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index 6a8f9b4699..2a82325ad9 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -127,6 +127,8 @@ public enum ErrorCodeEnum { SCHEDULE_NOT_FOUND(1700, "Schedule info not found"), SCHEDULE_DUPLICATE(1701, "Schedule info already exist"), + SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"), + SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition is not allowed"), WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"), WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no operation authority"), diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java index cb256a491c..2d936b5532 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java @@ -19,11 +19,25 @@ package org.apache.inlong.manager.common.enums; import lombok.Getter; +/** + * Status for schedule info. + * This is the transient status of the schedule info. + * With specified operations, the status will change to corresponding value. + * Status Operations + * NEW inlong group created with schedule info + * APPROVED the new inlong group approved by admin + * REGISTERED schedule info registered to schedule engine + * UPDATED update schedule info for a group + * DELETED delete a group + * */ @Getter public enum ScheduleStatus { NEW(100, "new"), - DELETED(40, "deleted"); + APPROVED(101, "approved"), + REGISTERED(102, "registered"), + UPDATED(103, "updated"), + DELETED(99, "deleted"); private final Integer code; private final String description; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java index 17b66497a9..0fb0479196 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java @@ -30,6 +30,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import java.sql.Timestamp; import java.util.Date; import java.util.List; @@ -137,6 +138,36 @@ public abstract class InlongGroupInfo extends BaseInlongGroup { @ApiModelProperty(value = "Inlong tenant") private String tenant; + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String scheduleUnit; + + @ApiModelProperty("Schedule interval") + private Integer scheduleInterval; + + @ApiModelProperty("Start time") + private Timestamp startTime; + + @ApiModelProperty("End time") + private Timestamp endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; + public abstract InlongGroupRequest genRequest(); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java index 6140bddad5..1adc210a0a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java @@ -34,6 +34,7 @@ import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; +import java.sql.Timestamp; import java.util.List; /** @@ -130,4 +131,34 @@ public abstract class InlongGroupRequest extends BaseInlongGroup { @NotNull(groups = UpdateValidation.class, message = "version cannot be null") private Integer version; + // schedule type, support [normal, crontab], 0 for normal and 1 for crontab + @ApiModelProperty("Schedule type") + private Integer scheduleType; + + // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway + @ApiModelProperty("TimeUnit for schedule interval") + private String scheduleUnit; + + @ApiModelProperty("Schedule interval") + private Integer scheduleInterval; + + @ApiModelProperty("Start time") + private Timestamp startTime; + + @ApiModelProperty("End time") + private Timestamp endTime; + + @ApiModelProperty("Delay time") + private Integer delayTime; + + @ApiModelProperty("Self depend") + private Integer selfDepend; + + @ApiModelProperty("Schedule task parallelism") + private Integer taskParallelism; + + @ApiModelProperty("Schedule task parallelism") + private Integer crontabExpression; + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java index 2386d817bb..24f2e6196e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -29,6 +29,7 @@ import lombok.NoArgsConstructor; import javax.validation.constraints.NotNull; import java.sql.Timestamp; +import java.util.Objects; @Data @Builder @@ -79,4 +80,32 @@ public class ScheduleInfo { @NotNull(groups = UpdateValidation.class, message = "version cannot be null") private Integer version; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScheduleInfo that = (ScheduleInfo) o; + return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, + that.scheduleUnit) + && Objects.equals(scheduleInterval, that.scheduleInterval) + && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, + that.selfDepend) + && Objects.equals(taskParallelism, that.taskParallelism) + && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, + that.version); + } + + @Override + public int hashCode() { + return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime, + delayTime, + selfDepend, taskParallelism, crontabExpression, version); + } + } \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java index b3c117da9a..a324cee7f5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -26,6 +26,7 @@ import lombok.Data; import javax.validation.constraints.NotNull; import java.sql.Timestamp; +import java.util.Objects; @Data @ApiModel("Schedule request") @@ -73,4 +74,31 @@ public class ScheduleInfoRequest { @NotNull(groups = UpdateValidation.class, message = "version cannot be null") private Integer version; + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScheduleInfoRequest that = (ScheduleInfoRequest) o; + return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, that.inlongGroupId) + && Objects.equals(scheduleType, that.scheduleType) && Objects.equals(scheduleUnit, + that.scheduleUnit) + && Objects.equals(scheduleInterval, that.scheduleInterval) + && Objects.equals(startTime, that.startTime) && Objects.equals(endTime, that.endTime) + && Objects.equals(delayTime, that.delayTime) && Objects.equals(selfDepend, + that.selfDepend) + && Objects.equals(taskParallelism, that.taskParallelism) + && Objects.equals(crontabExpression, that.crontabExpression) && Objects.equals(version, + that.version); + } + + @Override + public int hashCode() { + return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit, scheduleInterval, startTime, endTime, + delayTime, + selfDepend, taskParallelism, crontabExpression, version); + } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java similarity index 58% copy from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java copy to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java index dee5cbb2db..a122235de3 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java @@ -19,27 +19,28 @@ package org.apache.inlong.manager.schedule; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -/** - * Interface for schedule engine client which responses for communicating with schedule engine. - * */ -public interface ScheduleEngineClient { - - /** - * Register schedule to schedule engine. - * @param scheduleInfo schedule info to register - * */ - boolean register(ScheduleInfo scheduleInfo); - - /** - * Un-register schedule from schedule engine. - * @param scheduleInfo schedule info to unregister - * */ - boolean unregister(ScheduleInfo scheduleInfo); - - /** - * Update schedule from schedule engine. - * @param scheduleInfo schedule info to update - * */ - boolean update(ScheduleInfo scheduleInfo); +import org.springframework.stereotype.Service; +@Service +public class NoopScheduleClient implements ScheduleEngineClient { + + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.NONE.getType().equals(engineType); + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return true; + } + + @Override + public boolean unregister(String groupId) { + return true; + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return true; + } } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java new file mode 100644 index 0000000000..13f87b3c45 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Optional; + +@Service +public class ScheduleClientFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleClientFactory.class); + + @Value("${inlong.schedule.engine:none}") + private String scheduleEngineName; + + @Autowired + List<ScheduleEngineClient> scheduleEngineClients; + + public ScheduleEngineClient getInstance() { + Optional<ScheduleEngineClient> optScheduleClient = + scheduleEngineClients.stream().filter(t -> t.accept(scheduleEngineName)).findFirst(); + if (!optScheduleClient.isPresent()) { + LOGGER.warn("Schedule engine client not found for {} ", scheduleEngineName); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED, + String.format(ErrorCodeEnum.SCHEDULE_ENGINE_NOT_SUPPORTED.getMessage(), scheduleEngineName)); + } + LOGGER.info("Get schedule engine client success for {}", scheduleEngineName); + return optScheduleClient.get(); + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java index 1f52e280de..bc0c963b38 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java @@ -37,9 +37,9 @@ public interface ScheduleEngine { /** * Handle schedule unregister. - * @param scheduleInfo schedule info to unregister + * @param groupId group to un-register schedule info * */ - boolean handleUnregister(ScheduleInfo scheduleInfo); + boolean handleUnregister(String groupId); /** * Handle schedule update. diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java index dee5cbb2db..9c6cf081d4 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java @@ -24,6 +24,11 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; * */ public interface ScheduleEngineClient { + /** + * Check whether scheduleEngine type is matched. + * */ + boolean accept(String engineType); + /** * Register schedule to schedule engine. * @param scheduleInfo schedule info to register @@ -32,9 +37,10 @@ public interface ScheduleEngineClient { /** * Un-register schedule from schedule engine. - * @param scheduleInfo schedule info to unregister - * */ - boolean unregister(ScheduleInfo scheduleInfo); + * + * @param groupId schedule info to unregister + */ + boolean unregister(String groupId); /** * Update schedule from schedule engine. diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java similarity index 57% copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java copy to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java index cb256a491c..71949ef744 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ScheduleStatus.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java @@ -15,31 +15,19 @@ * limitations under the License. */ -package org.apache.inlong.manager.common.enums; +package org.apache.inlong.manager.schedule; import lombok.Getter; @Getter -public enum ScheduleStatus { +public enum ScheduleEngineType { - NEW(100, "new"), - DELETED(40, "deleted"); + NONE("None"), + QUARTZ("Quartz"); - private final Integer code; - private final String description; + private final String type; - ScheduleStatus(Integer code, String description) { - this.code = code; - this.description = description; + ScheduleEngineType(String type) { + this.type = type; } - - public static ScheduleStatus forCode(int code) { - for (ScheduleStatus status : values()) { - if (status.getCode() == code) { - return status; - } - } - throw new IllegalStateException(String.format("Illegal code=%s for ScheduleStatus", code)); - } - -} +} \ No newline at end of file diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java index 05aa6c01ae..6b7afe784f 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java @@ -19,18 +19,25 @@ package org.apache.inlong.manager.schedule.quartz; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineType; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * Built-in implementation of schedule engine client corresponding with {@link QuartzScheduleEngine}. * QuartzScheduleClient simply invokes the {@link QuartzScheduleEngine} to register/unregister/update * schedule info instead of calling a remote schedule service. * */ +@Service public class QuartzScheduleClient implements ScheduleEngineClient { - private final QuartzScheduleEngine scheduleEngine; + @Autowired + public QuartzScheduleEngine scheduleEngine; - public QuartzScheduleClient(QuartzScheduleEngine scheduleEngine) { - this.scheduleEngine = scheduleEngine; + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.QUARTZ.getType().equalsIgnoreCase(engineType); } @Override @@ -39,8 +46,8 @@ public class QuartzScheduleClient implements ScheduleEngineClient { } @Override - public boolean unregister(ScheduleInfo scheduleInfo) { - return scheduleEngine.handleUnregister(scheduleInfo); + public boolean unregister(String groupId) { + return scheduleEngine.handleUnregister(groupId); } @Override diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java index d9d2620211..e8bb1085ce 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; import com.google.common.annotations.VisibleForTesting; import lombok.Getter; +import org.quartz.Job; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; @@ -31,6 +32,7 @@ import org.quartz.Trigger; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; import java.util.HashSet; import java.util.Set; @@ -43,6 +45,7 @@ import static org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzTri * the register/unregister/update requests from {@link QuartzScheduleClient} * */ @Getter +@Service public class QuartzScheduleEngine implements ScheduleEngine { private static final Logger LOGGER = LoggerFactory.getLogger(QuartzScheduleEngine.class); @@ -90,7 +93,7 @@ public class QuartzScheduleEngine implements ScheduleEngine { } @VisibleForTesting - public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { + public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends Job> clz) { if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { throw new QuartzScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered"); } @@ -108,19 +111,19 @@ public class QuartzScheduleEngine implements ScheduleEngine { /** * Handle schedule unregister. - * @param scheduleInfo schedule info to unregister + * @param groupId group to un-register schedule info * */ @Override - public boolean handleUnregister(ScheduleInfo scheduleInfo) { - if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { + public boolean handleUnregister(String groupId) { + if (scheduledJobSet.contains(groupId)) { try { - scheduler.deleteJob(new JobKey(scheduleInfo.getInlongGroupId())); + scheduler.deleteJob(new JobKey(groupId)); } catch (SchedulerException e) { throw new QuartzScheduleException(e.getMessage()); } } - scheduledJobSet.remove(scheduleInfo.getInlongGroupId()); - LOGGER.info("Un-registered schedule info for {}", scheduleInfo.getInlongGroupId()); + scheduledJobSet.remove(groupId); + LOGGER.info("Un-registered schedule info for {}", groupId); return true; } @@ -134,8 +137,8 @@ public class QuartzScheduleEngine implements ScheduleEngine { } @VisibleForTesting - public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { - handleUnregister(scheduleInfo); + public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends Job> clz) { + handleUnregister(scheduleInfo.getInlongGroupId()); handleRegister(scheduleInfo, clz); LOGGER.info("Updated schedule info for {}", scheduleInfo.getInlongGroupId()); return false; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java index 65475a0b98..1e4f43983e 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java @@ -21,11 +21,11 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.schedule.ScheduleType; import org.apache.inlong.manager.schedule.ScheduleUnit; import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; -import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob; import org.apache.commons.lang3.StringUtils; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; +import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.ScheduleBuilder; @@ -42,7 +42,7 @@ import java.util.Date; * */ public class ScheduleUtils { - public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { + public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, Class<? extends Job> clz) { return JobBuilder.newJob(clz) .withIdentity(scheduleInfo.getInlongGroupId()) .build(); diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java index 9202ea5b40..bc0fecf99a 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.schedule.quartz; +import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.slf4j.Logger; @@ -25,7 +26,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -public class MockJob extends QuartzOfflineSyncJob { +public class MockJob implements Job { private static final Logger LOGGER = LoggerFactory.getLogger(MockJob.class); diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java index 008a7e42f8..11e7580f28 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java @@ -103,7 +103,7 @@ public class QuartzScheduleEngineTest extends BaseScheduleTest { MockJob.countDownLatch.await(); // un-register before trigger finalized - scheduleEngine.handleUnregister(scheduleInfo); + scheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()); // not job exist after un-register assertEquals(0, scheduleEngine.getScheduledJobSet().size()); exist = scheduleEngine.getScheduler().checkExists(jobKey); diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java index 415331be3d..da8fd66c61 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java @@ -20,7 +20,7 @@ package org.apache.inlong.manager.schedule.util; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.schedule.BaseScheduleTest; import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; -import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob; +import org.apache.inlong.manager.schedule.quartz.MockJob; import org.junit.jupiter.api.Test; import org.quartz.CronScheduleBuilder; @@ -102,7 +102,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest { @Test public void testGenJobDetail() { ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); - JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class); assertNotNull(jobDetail); JobKey jobKey = jobDetail.getKey(); @@ -116,7 +116,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest { public void testGenCronTrigger() { // normal ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); - JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class); Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo); assertNotNull(trigger); @@ -139,7 +139,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest { // cron scheduleInfo = genDefaultCronScheduleInfo(); - jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, MockJob.class); trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo); assertNotNull(trigger); diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml index 1b300dfc34..ae42b902b7 100644 --- a/inlong-manager/manager-service/pom.xml +++ b/inlong-manager/manager-service/pom.xml @@ -52,6 +52,11 @@ <artifactId>manager-workflow</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-schedule</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>manager-test</artifactId> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index c9f4019a4d..d42a61dbfb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -59,6 +59,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest; import org.apache.inlong.manager.pojo.group.InlongGroupRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sort.BaseSortConf; import org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType; @@ -72,6 +74,7 @@ import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.pojo.user.UserInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.cluster.InlongClusterService; +import org.apache.inlong.manager.service.schedule.ScheduleOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.StreamSourceOperator; @@ -111,6 +114,7 @@ import static org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG import static org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE; import static org.apache.inlong.common.constant.ClusterSwitch.CLUSTER_SWITCH_TIME; import static org.apache.inlong.common.constant.ClusterSwitch.FINISH_SWITCH_INTERVAL_MIN; +import static org.apache.inlong.manager.common.consts.InlongConstants.DATASYNC_OFFLINE_MODE; import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE; import static org.apache.inlong.manager.workflow.event.process.ProcessEventListener.EXECUTOR_SERVICE; @@ -158,6 +162,9 @@ public class InlongGroupServiceImpl implements InlongGroupService { @Autowired private TenantUserRoleEntityMapper tenantUserRoleEntityMapper; + @Autowired + ScheduleOperator scheduleOperator; + /** * Check whether modification is supported under the current group status, and which fields can be modified. * @@ -208,6 +215,11 @@ public class InlongGroupServiceImpl implements InlongGroupService { // save ext info this.saveOrUpdateExt(groupId, request.getExtList()); + // save schedule info for offline group + if (DATASYNC_OFFLINE_MODE.equals(request.getInlongGroupMode())) { + scheduleOperator.saveOpt(CommonBeanUtils.copyProperties(request, ScheduleInfoRequest::new), operator); + } + LOGGER.info("success to save inlong group for groupId={} by user={}", groupId, operator); return groupId; } @@ -239,7 +251,15 @@ public class InlongGroupServiceImpl implements InlongGroupService { Preconditions.expectNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); InlongGroupEntity entity = groupMapper.selectByGroupId(groupId); LOGGER.debug("success to check inlong group {}, exist? {}", groupId, entity != null); - return entity != null; + if (entity == null) { + return false; + } + return isScheduleInfoExist(entity); + } + + private boolean isScheduleInfoExist(InlongGroupEntity entity) { + return DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode()) + && scheduleOperator.scheduleInfoExist(entity.getInlongGroupId()); } @Override @@ -261,11 +281,29 @@ public class InlongGroupServiceImpl implements InlongGroupService { List<InlongStreamExtEntity> streamExtEntities = streamExtMapper.selectByRelatedId(groupId, null); BaseSortConf sortConf = buildSortConfig(streamExtEntities); groupInfo.setSortConf(sortConf); - + if (DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())) { + // get schedule info and set into group info + addScheduleInfo(entity, groupInfo); + } LOGGER.debug("success to get inlong group for groupId={}", groupId); return groupInfo; } + private void addScheduleInfo(InlongGroupEntity entity, InlongGroupInfo groupInfo) { + checkOfflineSyncScheduleExist(entity); + ScheduleInfo scheduleInfo = scheduleOperator.getScheduleInfo(entity.getInlongGroupId()); + CommonBeanUtils.copyProperties(scheduleInfo, groupInfo); + } + + private void checkOfflineSyncScheduleExist(InlongGroupEntity entity) { + // check schedule info for offline sync + if (!isScheduleInfoExist(entity)) { + String errorMsg = String.format("Schedule info not found for groupId=%s", entity.getInlongGroupId()); + LOGGER.error(errorMsg); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND, errorMsg); + } + } + @Override public String getTenant(String groupId, String operator) { InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId); @@ -457,6 +495,12 @@ public class InlongGroupServiceImpl implements InlongGroupService { // save ext info this.saveOrUpdateExt(groupId, request.getExtList()); + // save schedule info for offline group + if (DATASYNC_OFFLINE_MODE.equals(request.getInlongGroupMode())) { + scheduleOperator.updateAndRegister(CommonBeanUtils.copyProperties(request, ScheduleInfoRequest::new), + operator); + } + LOGGER.info("success to update inlong group for groupId={} by user={}", groupId, operator); return groupId; } @@ -612,6 +656,15 @@ public class InlongGroupServiceImpl implements InlongGroupService { // logically delete the associated extension info groupExtMapper.logicDeleteAllByGroupId(groupId); + // remove schedule + if (DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())) { + try { + scheduleOperator.deleteByGroupIdOpt(entity.getInlongGroupId(), operator); + } catch (Exception e) { + LOGGER.warn("failed to delete schedule info for groupId={}, error msg: {}", groupId, e.getMessage()); + } + } + LOGGER.info("success to delete group and group ext property for groupId={} by user={}", groupId, operator); return true; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java index e27dab9e89..39d3a2a3bf 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java @@ -24,11 +24,13 @@ import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; +import org.apache.inlong.manager.service.schedule.ScheduleOperator; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @@ -37,6 +39,9 @@ import java.util.List; @Slf4j public class GroupScheduleResourceListener implements ScheduleOperateListener { + @Autowired + private ScheduleOperator scheduleOperator; + @Override public TaskEvent event() { return TaskEvent.COMPLETE; @@ -68,7 +73,8 @@ public class GroupScheduleResourceListener implements ScheduleOperateListener { final String groupId = groupInfo.getInlongGroupId(); log.info("begin to register schedule info for groupId={}", groupId); - // todo: register schedule info to schedule service + // handle schedule info after group approved + scheduleOperator.handleGroupApprove(groupId); // after register schedule info successfully, add ext property to group ext info saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java new file mode 100644 index 0000000000..6bf9c01432 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; + +/** + * Operator for schedule. Including: + * 1. schedule info management + * 2. schedule operations like resister, un-register etc. + * */ +public interface ScheduleOperator { + + /** + * Save schedule info. + * There are two places may save schedule info: + * - 1. create new inlong group with schedule info + * - 2. create new schedule info directly(inlong group has been already exist), in this situation, we should + * register schedule info to schedule engine if group has been approved. + * @param request schedule request need to save + * @param operator name of operator + * @return schedule info id in backend storage + */ + int saveOpt(ScheduleInfoRequest request, String operator); + + /** + * Check whether schedule info exists for specified inlong group + * + * @param groupId the group id to be queried + * @return does it exist + */ + Boolean scheduleInfoExist(String groupId); + + /** + * Get schedule info based on inlong group id + * + * @param groupId inlong group id + * @return detail of inlong group + */ + ScheduleInfo getScheduleInfo(String groupId); + + /** + * Modify schedule information + * There are two places may update schedule info: + * - 1. update inlong group with new schedule info + * - 2. update schedule info directly(inlong group has been already exist) + * @param request schedule request that needs to be modified + * @param operator name of operator + * @return whether succeed + */ + Boolean updateOpt(ScheduleInfoRequest request, String operator); + + /** + * Register schedule information + * @param request schedule request that needs to be modified + * @param operator name of operator + * @return whether succeed + */ + Boolean updateAndRegister(ScheduleInfoRequest request, String operator); + + /** + * Delete schedule info for groupId. + * There are two places may delete schedule info: + * - 1. delete an inlong group + * - 2. delete schedule info directly, left inlong group alone without schedule info, which means the group of + * offline sync job will never be triggered + * @param groupId groupId to find a schedule info to delete + * @param operator name of operator + * @Return whether succeed + * */ + Boolean deleteByGroupIdOpt(String groupId, String operator); + + /** + * Handle inlong group approve, check schedule info and try to register it to schedule engine. + * @param groupId groupId to find a schedule info to delete + * @Return whether succeed + * */ + Boolean handleGroupApprove(String groupId); +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java new file mode 100644 index 0000000000..411a80766a --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.schedule; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity; +import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper; +import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; +import org.apache.inlong.manager.schedule.ScheduleClientFactory; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED; + +@Service +public class ScheduleOperatorImpl implements ScheduleOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleOperatorImpl.class); + + @Autowired + private ScheduleService scheduleService; + + @Autowired + private InlongGroupExtEntityMapper groupExtMapper; + + @Autowired + private ScheduleEntityMapper scheduleMapper; + + @Autowired + private ScheduleClientFactory scheduleClientFactory; + + private ScheduleEngineClient scheduleEngineClient; + + @Override + @Transactional(rollbackFor = Throwable.class) + public int saveOpt(ScheduleInfoRequest request, String operator) { + // save schedule info first + int scheduleInfoId = scheduleService.save(request, operator); + LOGGER.info("Save schedule info success for group {}", request.getInlongGroupId()); + // process new schedule info for approved inlong group + registerScheduleInfoForApprovedGroup(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator); + return scheduleInfoId; + } + + /** + * If an inlong group in DATASYNC_OFFLINE_MODE created first without schedule info and has been approved, it should + * be registered to schedule engine once the schedule info for this group is added. + * */ + private void registerScheduleInfoForApprovedGroup(ScheduleInfo scheduleInfo, String operator) { + String groupId = scheduleInfo.getInlongGroupId(); + InlongGroupExtEntity scheduleStatusExt = + groupExtMapper.selectByUniqueKey(groupId, InlongConstants.REGISTER_SCHEDULE_STATUS); + if (InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) { + // change schedule state to approved + scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), APPROVED, operator); + registerToScheduleEngine(scheduleInfo, operator, false); + LOGGER.info("Register schedule info success for group {}", groupId); + } + } + + private ScheduleEngineClient getScheduleEngineClient() { + if (scheduleEngineClient == null) { + scheduleEngineClient = scheduleClientFactory.getInstance(); + } + return scheduleEngineClient; + } + + @Override + public Boolean scheduleInfoExist(String groupId) { + return scheduleService.exist(groupId); + } + + @Override + public ScheduleInfo getScheduleInfo(String groupId) { + return scheduleService.get(groupId); + } + + @Override + @Transactional(rollbackFor = Throwable.class) + public Boolean updateOpt(ScheduleInfoRequest request, String operator) { + // if the inlong group exist without schedule info + // then, save the new schedule info when updating inlong group + if (!scheduleInfoExist(request.getInlongGroupId())) { + saveOpt(request, operator); + return true; + } + ScheduleInfo scheduleInfo = CommonBeanUtils.copyProperties(request, ScheduleInfo::new); + if (!needUpdate(scheduleInfo)) { + LOGGER.info("schedule info not changed for group {}", request.getInlongGroupId()); + return false; + } + // update schedule info + boolean res = scheduleService.update(request, operator); + // update status + scheduleService.updateStatus(request.getInlongGroupId(), UPDATED, operator); + return res; + } + + @Override + @Transactional(rollbackFor = Throwable.class) + public Boolean updateAndRegister(ScheduleInfoRequest request, String operator) { + updateOpt(request, operator); + return registerToScheduleEngine(CommonBeanUtils.copyProperties(request, ScheduleInfo::new), operator, true); + } + + /** + * There are three places may trigger resister schedule info to schedule engine: + * - 1. new group approved with schedule info + * - 2. new schedule info for an exist approved inlong group added + * - 3. group's schedule info updated + * */ + private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String operator, boolean isUpdate) { + // update(un-register and then register) or register + boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo) + : getScheduleEngineClient().register(scheduleInfo); + // update status to REGISTERED + scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), REGISTERED, operator); + LOGGER.info("{} schedule info success for group {}", + isUpdate ? "Update" : "Register", scheduleInfo.getInlongGroupId()); + return res; + } + + private boolean needUpdate(ScheduleInfo scheduleInfo) { + if (scheduleInfo == null) { + return false; + } + ScheduleInfo existedSchedule = getScheduleInfo(scheduleInfo.getInlongGroupId()); + return !scheduleInfo.equals(existedSchedule); + } + + @Override + public Boolean deleteByGroupIdOpt(String groupId, String operator) { + return scheduleService.deleteByGroupId(groupId, operator); + } + + @Override + public Boolean handleGroupApprove(String groupId) { + // if the inlong group exist without schedule info + // then, save the new schedule info when updating inlong group + if (!scheduleInfoExist(groupId)) { + LOGGER.warn("schedule info not exist for group {}", groupId); + return false; + } + ScheduleInfo scheduleInfo = getScheduleInfo(groupId); + // change schedule state to approved + scheduleService.updateStatus(groupId, APPROVED, null); + return registerToScheduleEngine(scheduleInfo, null, false); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java index d00e7134d4..f3eb189c38 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleService.java @@ -17,6 +17,7 @@ package org.apache.inlong.manager.service.schedule; +import org.apache.inlong.manager.common.enums.ScheduleStatus; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; @@ -61,6 +62,16 @@ public interface ScheduleService { Boolean update(@Valid @NotNull(message = "schedule request cannot be null") ScheduleInfoRequest request, String operator); + /** + * Update status of schedule info. + * + * @param groupId group to update schedule status + * @param newStatus status to update + * @param operator name of operator + * @return whether succeed + */ + Boolean updateStatus(String groupId, ScheduleStatus newStatus, String operator); + /** * Delete schedule info for gropuId. * @param groupId groupId to find a schedule info to delete diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java index 480189da9e..6459a811cd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleServiceImpl.java @@ -35,7 +35,18 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Objects; +import java.util.Set; + +import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.DELETED; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.NEW; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED; +import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED; @Service public class ScheduleServiceImpl implements ScheduleService { @@ -47,6 +58,9 @@ public class ScheduleServiceImpl implements ScheduleService { @Autowired private ScheduleEntityMapper scheduleEntityMapper; + // finite state machine + private Map<ScheduleStatus, Set<ScheduleStatus>> fsm; + @Override public int save(ScheduleInfoRequest request, String operator) { LOGGER.debug("begin to save schedule info, scheduleInfo={}, operator={}", request, operator); @@ -62,7 +76,8 @@ public class ScheduleServiceImpl implements ScheduleService { scheduleEntity.setStatus(ScheduleStatus.NEW.getCode()); scheduleEntity.setCreator(operator); scheduleEntity.setModifier(operator); - return scheduleEntityMapper.insert(scheduleEntity); + scheduleEntityMapper.insert(scheduleEntity); + return scheduleEntity.getId(); } @Override @@ -97,6 +112,43 @@ public class ScheduleServiceImpl implements ScheduleService { return true; } + @Override + public Boolean updateStatus(String groupId, ScheduleStatus newStatus, String operator) { + LOGGER.debug("begin to update schedule status for groupId={}", groupId); + ScheduleEntity entity = getScheduleEntity(groupId); + ScheduleStatus preStatus = ScheduleStatus.forCode(entity.getStatus()); + if (!isAllowedTransition(preStatus, newStatus)) { + String errorMsg = String.format("Schedule status transition is not allowed from %s to %s for group %s", + preStatus, newStatus, groupId); + LOGGER.error(errorMsg); + throw new BusinessException(ErrorCodeEnum.SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED); + } + entity.setStatus(newStatus.getCode()); + entity.setModifier(operator); + updateScheduleInfo(entity, + String.format("update schedule status from %s to %s failed for groupId=%s", + preStatus.getCode(), newStatus.getCode(), entity.getInlongGroupId())); + LOGGER.info("success to update schedule status from {} to {} for groupId={}", + preStatus.getCode(), newStatus.getCode(), groupId); + return true; + } + + private void initFSMIfNeed() { + if (fsm != null) { + return; + } + fsm = new HashMap<>(); + fsm.put(NEW, new HashSet<>(Arrays.asList(APPROVED, DELETED))); + fsm.put(APPROVED, new HashSet<>(Arrays.asList(REGISTERED, DELETED))); + fsm.put(REGISTERED, new HashSet<>(Arrays.asList(UPDATED, DELETED))); + fsm.put(UPDATED, new HashSet<>(Arrays.asList(REGISTERED, DELETED))); + } + + private boolean isAllowedTransition(ScheduleStatus preStatus, ScheduleStatus newStatus) { + initFSMIfNeed(); + return fsm.get(preStatus).contains(newStatus); + } + @Override public Boolean deleteByGroupId(String groupId, String operator) { LOGGER.debug("begin to delete schedule info for groupId={}", groupId); diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java index 801cc09b0e..4af07854c2 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InLongSchedulerController.java @@ -25,7 +25,7 @@ import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest; import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.service.operationlog.OperationLog; -import org.apache.inlong.manager.service.schedule.ScheduleService; +import org.apache.inlong.manager.service.schedule.ScheduleOperator; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; @@ -45,35 +45,43 @@ import org.springframework.web.bind.annotation.RestController; public class InLongSchedulerController { @Autowired - private ScheduleService scheduleService; + private ScheduleOperator scheduleOperator; @RequestMapping(value = "/schedule/save", method = RequestMethod.POST) @OperationLog(operation = OperationType.CREATE, operationTarget = OperationTarget.SCHEDULE) @ApiOperation(value = "Save schedule info") public Response<Integer> save(@RequestBody ScheduleInfoRequest request) { - int result = scheduleService.save(request, LoginUserUtils.getLoginUser().getName()); - return Response.success(result); + int scheduleInfoId = scheduleOperator.saveOpt(request, LoginUserUtils.getLoginUser().getName()); + return Response.success(scheduleInfoId); } @RequestMapping(value = "/schedule/exist/{groupId}", method = RequestMethod.GET) @ApiOperation(value = "Is the schedule info exists for inlong group") @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) public Response<Boolean> exist(@PathVariable String groupId) { - return Response.success(scheduleService.exist(groupId)); + return Response.success(scheduleOperator.scheduleInfoExist(groupId)); } @RequestMapping(value = "/schedule/get", method = RequestMethod.GET) @ApiOperation(value = "Get schedule info for inlong group") @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) public Response<ScheduleInfo> get(@RequestParam String groupId) { - return Response.success(scheduleService.get(groupId)); + return Response.success(scheduleOperator.getScheduleInfo(groupId)); } @RequestMapping(value = "/schedule/update", method = RequestMethod.POST) @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SCHEDULE) @ApiOperation(value = "Update schedule info") public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody ScheduleInfoRequest request) { - return Response.success(scheduleService.update(request, LoginUserUtils.getLoginUser().getName())); + return Response.success(scheduleOperator.updateOpt(request, LoginUserUtils.getLoginUser().getName())); + } + + @RequestMapping(value = "/schedule/updateAndRegister", method = RequestMethod.POST) + @OperationLog(operation = OperationType.UPDATE, operationTarget = OperationTarget.SCHEDULE) + @ApiOperation(value = "Update schedule info and register to schedule engine") + public Response<Boolean> updateAndRegister( + @Validated(UpdateValidation.class) @RequestBody ScheduleInfoRequest request) { + return Response.success(scheduleOperator.updateAndRegister(request, LoginUserUtils.getLoginUser().getName())); } @RequestMapping(value = "/schedule/delete/{groupId}", method = RequestMethod.DELETE) @@ -82,7 +90,7 @@ public class InLongSchedulerController { @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true) public Response<Boolean> delete(@PathVariable String groupId) { String operator = LoginUserUtils.getLoginUser().getName(); - return Response.success(scheduleService.deleteByGroupId(groupId, operator)); + return Response.success(scheduleOperator.deleteByGroupIdOpt(groupId, operator)); } } diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 81b32bb941..60f82953a5 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -98,4 +98,8 @@ cls.manager.endpoint=127.0.0.1 manager.url=127.0.0.1:8083 -agent.install.path= \ No newline at end of file +agent.install.path= + +# schedule engine type +# support none(no scheduler) and quartz(quartz scheduler), default is none +inlong.schedule.engine=none \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 6143155bf0..3e8f329470 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -95,3 +95,6 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 +# schedule engine type +# support none(no scheduler) and quartz(quartz scheduler), default is none +inlong.schedule.engine=none \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index dcab3fb1cf..5ff929c2b8 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -96,3 +96,6 @@ group.deleted.enabled=false # Tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 +# schedule engine type +# support none(no scheduler) and quartz(quartz scheduler), default is none +inlong.schedule.engine=none \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties index 6b56dfb3d9..a6eec820ad 100644 --- a/inlong-manager/manager-web/src/main/resources/application.properties +++ b/inlong-manager/manager-web/src/main/resources/application.properties @@ -66,3 +66,7 @@ audit.user.ids=3,4,5,6 # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 + +# schedule engine type +# support none(no scheduler) and quartz(quartz scheduler), default is none +inlong.schedule.engine=none \ No newline at end of file
