This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f5950f92321ee05806092466aa37daa5abfa27a0 Author: AloysZhang <[email protected]> AuthorDate: Thu Jun 13 22:44:42 2024 +0800 [INLONG-10396][Manager] Support build-in schedule base on quartz (#10412) --- .../inlong/manager/dao/entity/ScheduleEntity.java | 2 +- .../inlong/manager/pojo/schedule/ScheduleInfo.java | 4 +- .../manager/pojo/schedule/ScheduleInfoRequest.java | 4 +- inlong-manager/manager-schedule/pom.xml | 31 ++++ .../org/apache/inlong/schedule/ScheduleType.java | 45 ++++++ .../org/apache/inlong/schedule/ScheduleUnit.java | 48 ++++++ .../exception/QuartzScheduleException.java | 32 ++++ .../schedule/quartz/QuartzOfflineSyncJob.java | 40 +++++ .../schedule/quartz/QuartzScheduleClient.java | 50 +++++++ .../schedule/quartz/QuartzScheduleEngine.java | 156 ++++++++++++++++++++ .../schedule/quartz/QuartzSchedulerListener.java | 137 +++++++++++++++++ .../apache/inlong/schedule/util/ScheduleUtils.java | 139 +++++++++++++++++ .../apache/inlong/schedule/BaseScheduleTest.java | 116 +++++++++++++++ .../org/apache/inlong/schedule/quartz/MockJob.java | 48 ++++++ .../schedule/quartz/QuartzScheduleEngineTest.java | 161 ++++++++++++++++++++ .../inlong/schedule/util/ScheduleUtilsTest.java | 164 +++++++++++++++++++++ .../manager-schedule/src/test/resources/log4j2.xml | 46 ++++++ .../main/resources/h2/apache_inlong_manager.sql | 2 +- .../manager-web/sql/apache_inlong_manager.sql | 2 +- inlong-manager/manager-web/sql/changes-1.13.0.sql | 2 +- inlong-manager/pom.xml | 2 +- 21 files changed, 1222 insertions(+), 9 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java index 75237343cb..b2d49cd43c 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java @@ -34,7 +34,7 @@ public class ScheduleEntity implements Serializable { // schedule type, support [normal, crontab], 0 for normal and 1 for crontab private Integer scheduleType; // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] - // M=month, W=week, D=day, H=hour, M=minute, O=oneway + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway private String scheduleUnit; private Integer scheduleInterval; // schedule start time, long type timestamp diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java index 13afce70a4..2386d817bb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java @@ -50,7 +50,7 @@ public class ScheduleInfo { private Integer scheduleType; // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] - // M=month, W=week, D=day, H=hour, M=minute, O=oneway + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway @ApiModelProperty("TimeUnit for schedule interval") private String scheduleUnit; @@ -73,7 +73,7 @@ public class ScheduleInfo { private Integer taskParallelism; @ApiModelProperty("Schedule task parallelism") - private Integer crontabExpression; + private String crontabExpression; @ApiModelProperty(value = "Version number") @NotNull(groups = UpdateValidation.class, message = "version cannot be null") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java index eff1660719..b3c117da9a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java @@ -44,7 +44,7 @@ public class ScheduleInfoRequest { private Integer scheduleType; // time unit for offline task schedule interval, support [month, week, day, hour, minute, oneway] - // M=month, W=week, D=day, H=hour, M=minute, O=oneway + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway @ApiModelProperty("TimeUnit for schedule interval") private String scheduleUnit; @@ -67,7 +67,7 @@ public class ScheduleInfoRequest { private Integer taskParallelism; @ApiModelProperty("Schedule task parallelism") - private Integer crontabExpression; + private String crontabExpression; @ApiModelProperty(value = "Version number") @NotNull(groups = UpdateValidation.class, message = "version cannot be null") diff --git a/inlong-manager/manager-schedule/pom.xml b/inlong-manager/manager-schedule/pom.xml index f3a4e0a41f..8d598b47ca 100644 --- a/inlong-manager/manager-schedule/pom.xml +++ b/inlong-manager/manager-schedule/pom.xml @@ -27,6 +27,7 @@ <artifactId>manager-schedule</artifactId> <properties> + <quartz.version>2.3.2</quartz.version> <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> @@ -37,5 +38,35 @@ <artifactId>manager-pojo</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>${quartz.version}</version> + <exclusions> + <exclusion> + <groupId>com.mchange</groupId> + <artifactId>c3p0</artifactId> + </exclusion> + <exclusion> + <groupId>com.mchange</groupId> + <artifactId>mchange-commons-java</artifactId> + </exclusion> + <exclusion> + <groupId>com.zaxxer</groupId> + <artifactId>HikariCP-java7</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java new file mode 100644 index 0000000000..0f296fec42 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule; + +import lombok.Getter; + +/** + * Type of schedule, including normal and crontab. + * */ +@Getter +public enum ScheduleType { + + NORMAL(0), + CRONTAB(1); + + private final int code; + + ScheduleType(int code) { + this.code = code; + } + + public static ScheduleType fromCode(int code) { + for (ScheduleType type : ScheduleType.values()) { + if (type.code == code) { + return type; + } + } + return null; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java new file mode 100644 index 0000000000..7b4d60779f --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule; + +import lombok.Getter; + +@Getter +public enum ScheduleUnit { + + YEAR("Y"), + MONTH("M"), + DAY("D"), + WEEK("W"), + HOUR("H"), + MINUTE("I"), + SECOND("S"), + ONE_WAY("O"); + + final String unit; + + ScheduleUnit(String unit) { + this.unit = unit; + } + + public static ScheduleUnit getScheduleUnit(String unit) { + for (ScheduleUnit scheduleUnit : ScheduleUnit.values()) { + if (scheduleUnit.unit.equalsIgnoreCase(unit)) { + return scheduleUnit; + } + } + return null; + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java new file mode 100644 index 0000000000..2d0ff005bd --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.exception; + +/** + * Exceptions occur in the schedule procedure. + * */ +public class QuartzScheduleException extends RuntimeException { + + public QuartzScheduleException(String message) { + super(message); + } + + public QuartzScheduleException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java new file mode 100644 index 0000000000..27628b3320 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class QuartzOfflineSyncJob implements Job { + + private ScheduleInfo scheduleInfo; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + // TODO: complete the offline sync logic + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java new file mode 100644 index 0000000000..41fc8814a1 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.ScheduleEngineClient; + +/** + * Built-in implementation of schedule engine client corresponding with {@link QuartzScheduleEngine}. + * QuartzScheduleClient simply invokes the {@link QuartzScheduleEngine} to register/unregister/update + * schedule info instead of calling a remote schedule service. + * */ +public class QuartzScheduleClient implements ScheduleEngineClient { + + private final QuartzScheduleEngine scheduleEngine; + + public QuartzScheduleClient(QuartzScheduleEngine scheduleEngine) { + this.scheduleEngine = scheduleEngine; + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleRegister(scheduleInfo); + } + + @Override + public boolean unregister(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUnregister(scheduleInfo); + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUpdate(scheduleInfo); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java new file mode 100644 index 0000000000..31736f1887 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.ScheduleEngine; +import org.apache.inlong.schedule.exception.QuartzScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail; +import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger; + +/** + * The default implementation of schedule engine based on Quartz scheduler. Response for processing + * the register/unregister/update requests from {@link QuartzScheduleClient} + * */ +@Getter +public class QuartzScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(QuartzScheduleEngine.class); + + private final Scheduler scheduler; + private final Set<String> scheduledJobSet = new HashSet<>(); + + public QuartzScheduleEngine() { + try { + this.scheduler = new StdSchedulerFactory().getScheduler(); + LOGGER.info("Quartz scheduler engine initialized"); + } catch (SchedulerException e) { + throw new QuartzScheduleException("Failed to init quartz scheduler ", e); + } + } + + @Override + public void start() { + try { + // add listener + scheduler.getListenerManager().addSchedulerListener(new QuartzSchedulerListener(this)); + scheduler.start(); + LOGGER.info("Quartz scheduler engine started"); + } catch (SchedulerException e) { + throw new QuartzScheduleException("Failed to start quartz scheduler ", e); + } + } + + /** + * Clean job info from scheduledJobSet after trigger finalized. + * */ + public boolean triggerFinalized(Trigger trigger) { + String jobName = trigger.getJobKey().getName(); + LOGGER.info("Trigger finalized for job {}", jobName); + return scheduledJobSet.remove(jobName); + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + * */ + @Override + public boolean handleRegister(ScheduleInfo scheduleInfo) { + return handleRegister(scheduleInfo, QuartzOfflineSyncJob.class); + } + + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { + if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { + throw new QuartzScheduleException("Group " + scheduleInfo.getInlongGroupId() + " is already registered"); + } + JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz); + Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo); + try { + scheduler.scheduleJob(jobDetail, trigger); + scheduledJobSet.add(scheduleInfo.getInlongGroupId()); + LOGGER.info("Registered new schedule info for {}", scheduleInfo.getInlongGroupId()); + } catch (SchedulerException e) { + throw new QuartzScheduleException(e.getMessage()); + } + return false; + } + + /** + * Handle schedule unregister. + * @param scheduleInfo schedule info to unregister + * */ + @Override + public boolean handleUnregister(ScheduleInfo scheduleInfo) { + if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) { + try { + scheduler.deleteJob(new JobKey(scheduleInfo.getInlongGroupId())); + } catch (SchedulerException e) { + throw new QuartzScheduleException(e.getMessage()); + } + } + scheduledJobSet.remove(scheduleInfo.getInlongGroupId()); + LOGGER.info("Un-registered schedule info for {}", scheduleInfo.getInlongGroupId()); + return true; + } + + /** + * Handle schedule update. + * @param scheduleInfo schedule info to update + * */ + @Override + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + return handleUpdate(scheduleInfo, QuartzOfflineSyncJob.class); + } + + @VisibleForTesting + public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { + handleUnregister(scheduleInfo); + handleRegister(scheduleInfo, clz); + LOGGER.info("Updated schedule info for {}", scheduleInfo.getInlongGroupId()); + return false; + } + + @Override + public void stop() { + if (scheduler != null) { + try { + scheduler.shutdown(); + LOGGER.info("Quartz scheduler engine stopped"); + } catch (SchedulerException e) { + throw new QuartzScheduleException("Failed to stop quartz scheduler ", e); + } + } + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java new file mode 100644 index 0000000000..ca4f9f1b03 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.SchedulerListener; +import org.quartz.Trigger; +import org.quartz.TriggerKey; + +/** + * Default implementation for quartz scheduler listener. + * */ +public class QuartzSchedulerListener implements SchedulerListener { + + QuartzScheduleEngine quartzScheduleEngine; + + public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) { + this.quartzScheduleEngine = quartzScheduleEngine; + } + + @Override + public void jobScheduled(Trigger trigger) { + + } + + @Override + public void jobUnscheduled(TriggerKey triggerKey) { + + } + + @Override + public void triggerFinalized(Trigger trigger) { + quartzScheduleEngine.triggerFinalized(trigger); + } + + @Override + public void triggerPaused(TriggerKey triggerKey) { + + } + + @Override + public void triggersPaused(String triggerGroup) { + + } + + @Override + public void triggerResumed(TriggerKey triggerKey) { + + } + + @Override + public void triggersResumed(String triggerGroup) { + + } + + @Override + public void jobAdded(JobDetail jobDetail) { + + } + + @Override + public void jobDeleted(JobKey jobKey) { + + } + + @Override + public void jobPaused(JobKey jobKey) { + + } + + @Override + public void jobsPaused(String jobGroup) { + + } + + @Override + public void jobResumed(JobKey jobKey) { + + } + + @Override + public void jobsResumed(String jobGroup) { + + } + + @Override + public void schedulerError(String msg, SchedulerException cause) { + + } + + @Override + public void schedulerInStandbyMode() { + + } + + @Override + public void schedulerStarted() { + + } + + @Override + public void schedulerStarting() { + + } + + @Override + public void schedulerShutdown() { + + } + + @Override + public void schedulerShuttingdown() { + + } + + @Override + public void schedulingDataCleared() { + + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java new file mode 100644 index 0000000000..73114fcb6c --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.util; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.ScheduleType; +import org.apache.inlong.schedule.ScheduleUnit; +import org.apache.inlong.schedule.exception.QuartzScheduleException; +import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob; + +import org.apache.commons.lang3.StringUtils; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.ScheduleBuilder; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.SimpleTrigger; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import java.sql.Timestamp; +import java.util.Date; + +/** + * Tools for schedule. + * */ +public class ScheduleUtils { + + public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, Class<? extends QuartzOfflineSyncJob> clz) { + return JobBuilder.newJob(clz) + .withIdentity(scheduleInfo.getInlongGroupId()) + .build(); + } + + public static Trigger genQuartzTrigger(JobDetail jobDetail, ScheduleInfo scheduleInfo) { + String key = jobDetail.getKey().getName(); + Timestamp startTime = scheduleInfo.getStartTime(); + Timestamp endTime = scheduleInfo.getEndTime(); + int scheduleType = scheduleInfo.getScheduleType(); + ScheduleType type = ScheduleType.fromCode(scheduleType); + if (type == null) { + throw new QuartzScheduleException("Invalid schedule type: " + scheduleType); + } + switch (type) { + case NORMAL: + return TriggerBuilder.newTrigger() + .withIdentity(key) + .startAt(new Date(startTime.getTime())) + .endAt(new Date(endTime.getTime())) + .withSchedule(genSimpleQuartzScheduleBuilder(scheduleInfo.getScheduleInterval(), + scheduleInfo.getScheduleUnit())) + .forJob(jobDetail).build(); + case CRONTAB: + return TriggerBuilder.newTrigger() + .withIdentity(key) + .startAt(new Date(startTime.getTime())) + .endAt(new Date(endTime.getTime())) + .withSchedule(genCronQuartzScheduleBuilder(scheduleInfo.getCrontabExpression())) + .forJob(jobDetail).build(); + default: + throw new QuartzScheduleException("Unknown schedule type: " + scheduleType); + } + } + + // Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway + public static ScheduleBuilder<SimpleTrigger> genSimpleQuartzScheduleBuilder(int interval, String scheduleUnit) { + if (StringUtils.isBlank(scheduleUnit)) { + throw new QuartzScheduleException("Schedule unit cannot be empty"); + } + ScheduleUnit unit = ScheduleUnit.getScheduleUnit(scheduleUnit); + if (unit == null) { + throw new QuartzScheduleException("Unknown schedule unit: " + scheduleUnit); + } + switch (unit) { + case YEAR: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInHours(365 * 24 * interval) + .repeatForever(); + case MONTH: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInHours(30 * 24 * interval) + .repeatForever(); + case WEEK: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInHours(7 * 24 * interval) + .repeatForever(); + case DAY: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInHours(24 * interval) + .repeatForever(); + case HOUR: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInHours(interval) + .repeatForever(); + case MINUTE: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInMinutes(interval) + .repeatForever(); + case SECOND: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInSeconds(interval) + .repeatForever(); + case ONE_WAY: + return SimpleScheduleBuilder + .simpleSchedule() + .withIntervalInSeconds(interval) + .withRepeatCount(1); + default: + throw new QuartzScheduleException("Not supported schedule interval" + scheduleUnit); + } + } + + public static ScheduleBuilder<CronTrigger> genCronQuartzScheduleBuilder(String cronExpression) { + return CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionIgnoreMisfires(); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java new file mode 100644 index 0000000000..c0ccc4b14c --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.exception.QuartzScheduleException; + +import java.sql.Timestamp; + +import static org.apache.inlong.schedule.ScheduleUnit.SECOND; + +public class BaseScheduleTest { + + public static final int SCHEDULE_TYPE_NORMAL = 0; + public static final int SCHEDULE_TYPE_CRON = 1; + public static final int DEFAULT_INTERVAL = 2; + public static final long DEFAULT_SPAN_IN_MS = 10 * 1000; + public static final String ILLEGAL_TIMEUNIT = "I"; + public static final String GROUP_ID = "testGroup"; + public static final String CRON_EXPRESSION_PER_SECONDS = "*/1 * * * * ?"; + public static final int CRON_SCHEDULE_INTERVAL_PER_SECONDS = 1; + public static final String CRON_EXPRESSION_EVERY_TWO_SECONDS = "*/2 * * * * ?"; + public static final int CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS = 2; + public static final String ILLEGAL_CRON_EXPRESSION = "*/1 * * ?"; + + public ScheduleInfo genDefaultScheduleInfo() { + return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS); + } + + public ScheduleInfo genNormalScheduleInfo(String groupId, String scheduleUnit, int scheduleInterval, + long timeSpanInMs) { + ScheduleInfo scheduleInfo = new ScheduleInfo(); + scheduleInfo.setInlongGroupId(groupId); + scheduleInfo.setScheduleType(SCHEDULE_TYPE_NORMAL); + scheduleInfo.setScheduleUnit(scheduleUnit); + scheduleInfo.setScheduleInterval(scheduleInterval); + setStartAndEndTime(scheduleInfo, timeSpanInMs); + return scheduleInfo; + } + + public ScheduleInfo genDefaultCronScheduleInfo() { + return genCronScheduleInfo(GROUP_ID, CRON_EXPRESSION_PER_SECONDS, DEFAULT_SPAN_IN_MS); + } + + public ScheduleInfo genCronScheduleInfo(String groupId, String cronExpression, long timeSpanInMs) { + ScheduleInfo scheduleInfo = new ScheduleInfo(); + scheduleInfo.setInlongGroupId(groupId); + scheduleInfo.setScheduleType(SCHEDULE_TYPE_CRON); + scheduleInfo.setCrontabExpression(cronExpression); + setStartAndEndTime(scheduleInfo, timeSpanInMs); + return scheduleInfo; + } + + private void setStartAndEndTime(ScheduleInfo scheduleInfo, long timeSpanInMs) { + long startTime = System.currentTimeMillis() / 1000 * 1000; + long endTime = startTime + timeSpanInMs; + scheduleInfo.setStartTime(new Timestamp(startTime)); + scheduleInfo.setEndTime(new Timestamp(endTime)); + } + + protected long calculateScheduleTimes(ScheduleInfo scheduleInfo, boolean isCron) { + + long timeSpanInMs = scheduleInfo.getEndTime().getTime() - scheduleInfo.getStartTime().getTime(); + int interval = -1; + ScheduleUnit scheduleUnit = null; + if (isCron) { + if (scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_PER_SECONDS)) { + interval = CRON_SCHEDULE_INTERVAL_PER_SECONDS; + } else if (scheduleInfo.getCrontabExpression().equalsIgnoreCase(CRON_EXPRESSION_EVERY_TWO_SECONDS)) { + interval = CRON_SCHEDULE_INTERVAL_EVERY_TWO_SECONDS; + } + scheduleUnit = SECOND; + } else { + interval = scheduleInfo.getScheduleInterval(); + scheduleUnit = ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()); + } + if (scheduleUnit == null) { + throw new QuartzScheduleException("Schedule unit is null"); + } + switch (scheduleUnit) { + case YEAR: + return timeSpanInMs / 365 / 1000 / 3600 / 24 / 7 / interval; + case MONTH: + return timeSpanInMs / 30 / 1000 / 3600 / 24 / 7 / interval; + case WEEK: + return timeSpanInMs / 1000 / 3600 / 24 / 7 / interval; + case DAY: + return timeSpanInMs / 1000 / 3600 / 24 / interval; + case HOUR: + return timeSpanInMs / 1000 / 3600 / interval; + case MINUTE: + return timeSpanInMs / 1000 / 60 / interval; + case SECOND: + return timeSpanInMs / 1000 / interval; + case ONE_WAY: + return 1; + default: + return 0; + } + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java new file mode 100644 index 0000000000..47ce19ad09 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockJob extends QuartzOfflineSyncJob { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockJob.class); + + public static CountDownLatch countDownLatch; + private static AtomicInteger counter = new AtomicInteger(0); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + if (countDownLatch.getCount() > 0) { + countDownLatch.countDown(); + } + LOGGER.info("MockJob executed " + counter.incrementAndGet()); + } + + public static void setCount(int count) { + countDownLatch = new CountDownLatch(count); + counter.set(0); + LOGGER.info("MockJob has been reset."); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java new file mode 100644 index 0000000000..fc5ca9d9fd --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.quartz; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.BaseScheduleTest; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.quartz.JobKey; + +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.schedule.ScheduleUnit.SECOND; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class QuartzScheduleEngineTest extends BaseScheduleTest { + + private static QuartzScheduleEngine scheduleEngine; + + @BeforeAll + public static void initScheduleEngine() throws Exception { + scheduleEngine = new QuartzScheduleEngine(); + scheduleEngine.start(); + } + + @Test + @Timeout(30) + public void testRegisterScheduleInfo() throws Exception { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testRegister(scheduleInfo, false); + + // 2. test for cron schedule + scheduleInfo = genDefaultCronScheduleInfo(); + testRegister(scheduleInfo, true); + } + + private void testRegister(ScheduleInfo scheduleInfo, boolean isCrontab) throws Exception { + // cal total schedule times + long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab); + // set countdown latch + MockJob.setCount((int) expectCount); + // register schedule info + scheduleEngine.handleRegister(scheduleInfo, MockJob.class); + // check job exist + assertEquals(1, scheduleEngine.getScheduledJobSet().size()); + JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId()); + boolean exist = scheduleEngine.getScheduler().checkExists(jobKey); + assertTrue(exist); + MockJob.countDownLatch.await(); + + // not job exist after scheduled + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, scheduleEngine.getScheduledJobSet().size()); + assertFalse(scheduleEngine.getScheduler().checkExists(jobKey)); + }); + } + + @Test + @Timeout(30) + public void testUnRegisterScheduleInfo() throws Exception { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testUnRegister(scheduleInfo, false); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = genDefaultCronScheduleInfo(); + testUnRegister(scheduleInfo, true); + } + + private void testUnRegister(ScheduleInfo scheduleInfo, boolean isCrontab) throws Exception { + // cal total schedule times + long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab); + + MockJob.setCount((int) (expectCount / 2)); + // register schedule info + scheduleEngine.handleRegister(scheduleInfo, MockJob.class); + // check job exist + assertEquals(1, scheduleEngine.getScheduledJobSet().size()); + JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId()); + boolean exist = scheduleEngine.getScheduler().checkExists(jobKey); + assertTrue(exist); + MockJob.countDownLatch.await(); + + // un-register before trigger finalized + scheduleEngine.handleUnregister(scheduleInfo); + // not job exist after un-register + assertEquals(0, scheduleEngine.getScheduledJobSet().size()); + exist = scheduleEngine.getScheduler().checkExists(jobKey); + assertFalse(exist); + } + + @Test + @Timeout(50) + public void testUpdateScheduleInfo() throws Exception { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = + genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS); + ScheduleInfo scheduleInfoToUpdate = + genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), DEFAULT_INTERVAL / 2, DEFAULT_SPAN_IN_MS); + testUpdate(scheduleInfo, scheduleInfoToUpdate, false); + + // 2. test for cron schedule + scheduleInfo = genCronScheduleInfo(GROUP_ID, CRON_EXPRESSION_EVERY_TWO_SECONDS, DEFAULT_SPAN_IN_MS); + scheduleInfoToUpdate = genCronScheduleInfo(GROUP_ID, CRON_EXPRESSION_PER_SECONDS, DEFAULT_SPAN_IN_MS); + testUpdate(scheduleInfo, scheduleInfoToUpdate, true); + } + + public void testUpdate(ScheduleInfo scheduleInfo, ScheduleInfo scheduleInfoToUpdate, boolean isCrontab) + throws Exception { + // cal total schedule times + long expectCount = calculateScheduleTimes(scheduleInfo, isCrontab); + MockJob.setCount((int) (expectCount / 2)); + // register schedule info + scheduleEngine.handleRegister(scheduleInfo, MockJob.class); + // check job exist + assertEquals(1, scheduleEngine.getScheduledJobSet().size()); + JobKey jobKey = new JobKey(scheduleInfo.getInlongGroupId()); + boolean exist = scheduleEngine.getScheduler().checkExists(jobKey); + assertTrue(exist); + MockJob.countDownLatch.await(); + + // update schedule before trigger finalized + expectCount = calculateScheduleTimes(scheduleInfoToUpdate, isCrontab); + MockJob.setCount((int) expectCount); + scheduleEngine.handleUpdate(scheduleInfoToUpdate, MockJob.class); + + // job scheduled after updated + assertEquals(1, scheduleEngine.getScheduledJobSet().size()); + exist = scheduleEngine.getScheduler().checkExists(jobKey); + assertTrue(exist); + + MockJob.countDownLatch.await(); + + // not job exist after scheduled + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0, scheduleEngine.getScheduledJobSet().size()); + assertFalse(scheduleEngine.getScheduler().checkExists(jobKey)); + }); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java new file mode 100644 index 0000000000..77b84f3064 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.schedule.util; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.schedule.BaseScheduleTest; +import org.apache.inlong.schedule.exception.QuartzScheduleException; +import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob; + +import org.junit.jupiter.api.Test; +import org.quartz.CronScheduleBuilder; +import org.quartz.CronTrigger; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.ScheduleBuilder; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.SimpleTrigger; +import org.quartz.Trigger; +import org.quartz.TriggerKey; + +import java.util.Date; + +import static org.apache.inlong.schedule.ScheduleUnit.DAY; +import static org.apache.inlong.schedule.ScheduleUnit.HOUR; +import static org.apache.inlong.schedule.ScheduleUnit.MINUTE; +import static org.apache.inlong.schedule.ScheduleUnit.MONTH; +import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY; +import static org.apache.inlong.schedule.ScheduleUnit.WEEK; +import static org.apache.inlong.schedule.ScheduleUnit.YEAR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ScheduleUtilsTest extends BaseScheduleTest { + + @Test + public void testGenScheduleBuilder() { + ScheduleBuilder<SimpleTrigger> builder = + ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, YEAR.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, MONTH.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, WEEK.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, DAY.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, HOUR.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, MINUTE.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + builder = ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, ONE_WAY.getUnit()); + assertNotNull(builder); + assertInstanceOf(SimpleScheduleBuilder.class, builder); + + try { + ScheduleUtils.genSimpleQuartzScheduleBuilder(DEFAULT_INTERVAL, ILLEGAL_TIMEUNIT); + } catch (Exception e) { + assertInstanceOf(QuartzScheduleException.class, e); + } + + ScheduleBuilder<CronTrigger> cronBuilder = ScheduleUtils.genCronQuartzScheduleBuilder( + CRON_EXPRESSION_EVERY_TWO_SECONDS); + assertNotNull(cronBuilder); + assertInstanceOf(CronScheduleBuilder.class, cronBuilder); + + try { + ScheduleUtils.genCronQuartzScheduleBuilder(ILLEGAL_CRON_EXPRESSION); + } catch (Exception e) { + String errorMsg = e.getMessage(); + assertTrue(errorMsg.contains(ILLEGAL_CRON_EXPRESSION)); + } + } + + @Test + public void testGenJobDetail() { + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + assertNotNull(jobDetail); + + JobKey jobKey = jobDetail.getKey(); + assertNotNull(jobKey); + + String identity = jobKey.getName(); + assertEquals(scheduleInfo.getInlongGroupId(), identity); + } + + @Test + public void testGenCronTrigger() { + // normal + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + + Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo); + assertNotNull(trigger); + + TriggerKey triggerKey = trigger.getKey(); + assertNotNull(triggerKey); + String identity = triggerKey.getName(); + assertEquals(scheduleInfo.getInlongGroupId(), identity); + + ScheduleBuilder<? extends Trigger> scheduleBuilder = trigger.getScheduleBuilder(); + assertInstanceOf(SimpleScheduleBuilder.class, scheduleBuilder); + + Date startDate = trigger.getStartTime(); + assertNotNull(startDate); + assertEquals(startDate.getTime(), scheduleInfo.getStartTime().getTime()); + + Date endDate = trigger.getEndTime(); + assertNotNull(endDate); + assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime()); + + // cron + scheduleInfo = genDefaultCronScheduleInfo(); + jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, QuartzOfflineSyncJob.class); + + trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo); + assertNotNull(trigger); + + triggerKey = trigger.getKey(); + assertNotNull(triggerKey); + identity = triggerKey.getName(); + assertEquals(scheduleInfo.getInlongGroupId(), identity); + + scheduleBuilder = trigger.getScheduleBuilder(); + assertInstanceOf(CronScheduleBuilder.class, scheduleBuilder); + + startDate = trigger.getStartTime(); + assertNotNull(startDate); + assertEquals(startDate.getTime(), scheduleInfo.getStartTime().getTime()); + + endDate = trigger.getEndTime(); + assertNotNull(endDate); + assertEquals(endDate.getTime(), scheduleInfo.getEndTime().getTime()); + + } +} diff --git a/inlong-manager/manager-schedule/src/test/resources/log4j2.xml b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..8f2da663e7 --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/resources/log4j2.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<configuration status="WARN" monitorInterval="30"> + <Properties> + <property name="basePath">logs</property> + <property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} -%5p [%5.30t] %-30.30C{1.}:%L - %m%n</property> + <property name="output_log_level">DEBUG</property> + <property name="all_fileName">${basePath}/manager-service-ut.log</property> + <property name="console_print_level">DEBUG</property> + </Properties> + + <appenders> + <Console name="Console" target="SYSTEM_OUT"> + <ThresholdFilter level="${console_print_level}" onMatch="ACCEPT" onMismatch="DENY"/> + <PatternLayout pattern="${log_pattern}"/> + <follow>true</follow> + </Console> + <File name="AllFile" fileName="${all_fileName}"> + <PatternLayout pattern="${log_pattern}"/> + </File> + </appenders> + + <loggers> + <root level="${output_log_level}"> + <appender-ref ref="Console"/> + <appender-ref ref="AllFile"/> + </root> + </loggers> +</configuration> \ No newline at end of file diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index a8b545f7bf..babe5139f8 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -970,7 +970,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', - `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway', `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index f7dc1c0a90..b4458af76f 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -1022,7 +1022,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', - `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway', `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql b/inlong-manager/manager-web/sql/changes-1.13.0.sql index 38e7627c5e..6a248ef0a2 100644 --- a/inlong-manager/manager-web/sql/changes-1.13.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql @@ -97,7 +97,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config` `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key', `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id, undeleted ones cannot be repeated', `schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT 'Schedule type, 0 for normal, 1 for crontab', - `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit,M=month, W=week, D=day, H=hour, M=minute, O=oneway', + `schedule_unit` varchar(64) NOT NULL COMMENT 'Schedule unit, Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneway', `schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule interval', `start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Start time for schedule', `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'End time for schedule', diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml index bff220728b..cc7421d1c1 100644 --- a/inlong-manager/pom.xml +++ b/inlong-manager/pom.xml @@ -40,8 +40,8 @@ <module>manager-service</module> <module>manager-workflow</module> <module>manager-web</module> - <module>manager-docker</module> <module>manager-schedule</module> + <module>manager-docker</module> </modules> <properties>
