This is an automated email from the ASF dual-hosted git repository. monster pushed a commit to branch schedule in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 87214c40356380ee15db3cb2cc2e5bde1c105b30 Author: monster <[email protected]> AuthorDate: Wed May 17 21:31:23 2023 +0800 [WIP][Feature]Support offline scheduling --- .../apache/streampark/common/util/DateUtils.scala | 34 ++++- .../streampark/console/core/entity/Schedule.java | 71 +++++++++ .../console/core/enums/ScheduleState.java | 46 ++++++ .../console/core/mapper/SchedulerMapper.java | 24 ++++ .../console/core/quartz/JobScheduleTask.java | 83 +++++++++++ .../console/core/quartz/QuartzExecutors.java | 159 +++++++++++++++++++++ .../console/core/quartz/QuartzTaskUtils.java | 53 +++++++ .../console/core/quartz/SchedulerApi.java | 31 ++++ .../console/core/service/SchedulerService.java | 44 ++++++ .../core/service/impl/SchedulerServiceImpl.java | 71 +++++++++ .../src/main/resources/application.yml | 21 +++ 11 files changed, 636 insertions(+), 1 deletion(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala index 8c62eebe1..fe91ee2b4 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala @@ -16,8 +16,10 @@ */ package org.apache.streampark.common.util +import org.apache.commons.lang3.StringUtils + import java.text.{ParseException, SimpleDateFormat} -import java.time.{Duration, LocalDateTime} +import java.time.{Duration, LocalDateTime, ZonedDateTime, ZoneId} import java.time.format.DateTimeFormatter import java.util._ import java.util.concurrent.TimeUnit @@ -220,4 +222,34 @@ object DateUtils { DateUtils.getDateFormat(d, format) } + def transformTimezoneDate( + date: Date, + sourceTimezoneId: String, + targetTimezoneId: String): Date = { + Option(sourceTimezoneId) + .filter(StringUtils.isNotEmpty) + .flatMap(_ => Option(targetTimezoneId)) + .filter(StringUtils.isNotEmpty) + .map( + _ => { + val dateToString = dateToString(date, sourceTimezoneId) + val localDateTime = + LocalDateTime.parse(dateToString, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + val zonedDateTime = + ZonedDateTime.of(localDateTime, TimeZone.getTimeZone(targetTimezoneId).toZoneId) + Date.from(zonedDateTime.toInstant) + }) + .getOrElse(date) + } + + def transformTimezoneDate(date: Date, targetTimezoneId: String): Date = { + transformTimezoneDate(date, ZoneId.systemDefault().getId, targetTimezoneId) + } + + def getTimezone(timezoneId: String): Option[TimeZone] = { + Option(timezoneId) + .filter(StringUtils.isNotEmpty) + .map(TimeZone.getTimeZone) + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java new file mode 100644 index 000000000..903088f5b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import org.apache.streampark.console.core.enums.ScheduleState; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.Data; + +import java.util.Date; + +@TableName("t_flink_schedules") +@Data +public class Schedule { + + @TableId(value = "id", type = IdType.AUTO) + private int id; + + private Long appId; + + @TableField(exist = false) + private String description; + + /** schedule start time */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date startTime; + + /** schedule end time */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date endTime; + + /** + * timezoneId + * + * <p>see {@link java.util.TimeZone#getTimeZone(String)} + */ + private String timezoneId; + + /** crontab expression */ + private String crontab; + + /** create time */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + /** update time */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date updateTime; + + /** release state */ + private ScheduleState scheduleState; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java new file mode 100644 index 000000000..4ff720673 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.Getter; + +public enum ScheduleState { + + /** 0 offline 1 online */ + OFFLINE(0, "offline"), + ONLINE(1, "online"); + + ScheduleState(int code, String descp) { + this.code = code; + this.descp = descp; + } + + @Getter @EnumValue private final int code; + @Getter private final String descp; + + public static ReleaseState getEnum(int value) { + for (ReleaseState e : ReleaseState.values()) { + if (e.ordinal() == value) { + return e; + } + } + // For values out of enum scope + return null; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java new file mode 100644 index 000000000..ac672e345 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.Schedule; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface SchedulerMapper extends BaseMapper<Schedule> {} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java new file mode 100644 index 000000000..928b058ae --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.quartz; + +import org.apache.streampark.console.core.entity.Schedule; +import org.apache.streampark.console.core.enums.ScheduleState; +import org.apache.streampark.console.core.service.SchedulerService; + +import lombok.extern.slf4j.Slf4j; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.springframework.scheduling.quartz.QuartzJobBean; + +import java.util.Date; + +@Slf4j +public class JobScheduleTask extends QuartzJobBean { + + private final SchedulerService schedulerService; + + public JobScheduleTask(SchedulerService schedulerService) { + this.schedulerService = schedulerService; + } + + @Override + protected void executeInternal(JobExecutionContext context) { + JobDataMap dataMap = context.getJobDetail().getJobDataMap(); + + int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID); + + Date scheduledFireTime = context.getScheduledFireTime(); + + Date fireTime = context.getFireTime(); + + log.info( + "Scheduled fire time :{}, fire time :{}, process id :{}.", + scheduledFireTime, + fireTime, + scheduleId); + + // query schedule + Schedule schedule = schedulerService.querySchedule(scheduleId); + if (schedule == null || ScheduleState.OFFLINE == schedule.getScheduleState()) { + log.warn( + "Job schedule does not exist in db or process schedule offline,delete schedule job in quartz, scheduleId:{}.", + scheduleId); + deleteJob(context, scheduleId); + return; + } + // start flink job + // ....... + } + + private void deleteJob(JobExecutionContext context, int scheduleId) { + final Scheduler scheduler = context.getScheduler(); + JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId); + try { + if (scheduler.checkExists(jobKey)) { + log.info("Try to delete job: {}.", scheduleId); + scheduler.deleteJob(jobKey); + } + } catch (Exception e) { + log.error("Failed to delete job: {}.", jobKey); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java new file mode 100644 index 000000000..2ba57af09 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.quartz; + +import org.apache.streampark.common.util.DateUtils; +import org.apache.streampark.console.core.entity.Schedule; + +import com.google.common.base.Strings; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.quartz.CronTrigger; +import org.quartz.JobDetail; +import org.quartz.JobKey; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.TriggerKey; + +import java.util.Date; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.quartz.CronScheduleBuilder.cronSchedule; +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +@Slf4j +public class QuartzExecutors implements SchedulerApi { + + private final Scheduler scheduler; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public QuartzExecutors(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @SneakyThrows + @Override + public void start() { + try { + scheduler.start(); + } catch (Exception e) { + throw new SchedulerException("Failed to start quartz scheduler.", e); + } + } + + @SneakyThrows + @Override + public void insertOrUpdateScheduleTask(Schedule schedule) { + JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId()); + Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(schedule); + String cronExpression = schedule.getCrontab(); + String timezoneId = schedule.getTimezoneId(); + Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId); + Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId); + + lock.writeLock().lock(); + try { + + JobDetail jobDetail; + if (scheduler.checkExists(jobKey)) { + + jobDetail = scheduler.getJobDetail(jobKey); + jobDetail.getJobDataMap().putAll(jobDataMap); + } else { + jobDetail = newJob(JobScheduleTask.class).withIdentity(jobKey).build(); + + jobDetail.getJobDataMap().putAll(jobDataMap); + + scheduler.addJob(jobDetail, false, true); + + log.info("Add job, job name: {}, group name: {}.", jobKey.getName(), jobKey.getGroup()); + } + + TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup()); + + CronTrigger cronTrigger = + newTrigger() + .withIdentity(triggerKey) + .startAt(startDate) + .endAt(endDate) + .withSchedule( + cronSchedule(cronExpression) + .withMisfireHandlingInstructionDoNothing() + .inTimeZone(DateUtils.getTimezone(timezoneId).get())) + .forJob(jobDetail) + .build(); + + if (scheduler.checkExists(triggerKey)) { + CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); + String oldCronExpression = oldCronTrigger.getCronExpression(); + + if (!Strings.nullToEmpty(cronExpression) + .equalsIgnoreCase(Strings.nullToEmpty(oldCronExpression))) { + scheduler.rescheduleJob(triggerKey, cronTrigger); + log.info( + "Reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", + triggerKey.getName(), + triggerKey.getGroup(), + cronExpression, + startDate, + endDate); + } + } else { + scheduler.scheduleJob(cronTrigger); + log.info( + "Schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}", + triggerKey.getName(), + triggerKey.getGroup(), + cronExpression, + startDate, + endDate); + } + } catch (Exception e) { + throw new SchedulerException("Add schedule job failed.", e); + } finally { + lock.writeLock().unlock(); + } + } + + @SneakyThrows + @Override + public void deleteScheduleTask(int scheduleId) { + JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId); + try { + if (scheduler.checkExists(jobKey)) { + log.info("Try to delete scheduler task, schedulerId: {}.", scheduleId); + scheduler.deleteJob(jobKey); + } + } catch (Exception e) { + log.error("Failed to delete scheduler task, schedulerId: {}.", scheduleId, e); + throw new SchedulerException("Failed to delete scheduler task."); + } + } + + @Override + public void close() throws Exception { + try { + scheduler.shutdown(); + } catch (org.quartz.SchedulerException e) { + throw new SchedulerException("Failed to shutdown scheduler.", e); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java new file mode 100644 index 000000000..a92da37a2 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.quartz; + +import org.apache.streampark.common.util.JsonUtils; +import org.apache.streampark.console.core.entity.Schedule; + +import org.quartz.JobKey; + +import java.util.HashMap; +import java.util.Map; + +public final class QuartzTaskUtils { + + public static final String QUARTZ_JOB_PREFIX = "job"; + public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup"; + public static final String UNDERLINE = "_"; + public static final String SCHEDULE_ID = "scheduleId"; + public static final String SCHEDULE = "schedule"; + + /** + * @param schedulerId scheduler id + * @return quartz job name + */ + public static JobKey getJobKey(int schedulerId) { + String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId; + String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE; + return new JobKey(jobName, jobGroup); + } + + public static Map<String, Object> buildDataMap(Schedule schedule) { + Map<String, Object> dataMap = new HashMap<>(8); + dataMap.put(SCHEDULE_ID, schedule.getId()); + dataMap.put(SCHEDULE, JsonUtils.Marshal(schedule).toJson()); + + return dataMap; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java new file mode 100644 index 000000000..d03f0035b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.quartz; + +import org.apache.streampark.console.core.entity.Schedule; + +public interface SchedulerApi extends AutoCloseable { + + void start(); + + void insertOrUpdateScheduleTask(Schedule schedule); + + void deleteScheduleTask(int scheduleId); + + void close() throws Exception; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java new file mode 100644 index 000000000..4e5f6583b --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.entity.Schedule; +import org.apache.streampark.console.core.enums.ReleaseState; +import org.apache.streampark.console.core.enums.ScheduleState; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.Map; + +public interface SchedulerService extends IService<Schedule> { + + boolean insertSchedule(Long appId, String schedule); + + boolean updateSchedule(Long appId, String scheduleExpression); + + Map<String, Object> setScheduleState(Long appId, ReleaseState scheduleStatus); + + boolean deleteSchedule(int scheduleId); + + Map<String, Object> previewSchedule(String schedule); + + IPage<Schedule> page(Schedule savePoint, ScheduleState request); + + Schedule querySchedule(int id); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java new file mode 100644 index 000000000..2714eb1eb --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.core.entity.Schedule; +import org.apache.streampark.console.core.enums.ReleaseState; +import org.apache.streampark.console.core.enums.ScheduleState; +import org.apache.streampark.console.core.mapper.SchedulerMapper; +import org.apache.streampark.console.core.service.SchedulerService; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Service; + +import java.util.Map; + +/** scheduler service impl */ +@Service +public class SchedulerServiceImpl extends ServiceImpl<SchedulerMapper, Schedule> + implements SchedulerService { + + @Override + public boolean insertSchedule(Long appId, String schedule) { + return false; + } + + @Override + public boolean updateSchedule(Long appId, String scheduleExpression) { + return false; + } + + @Override + public Map<String, Object> setScheduleState(Long appId, ReleaseState scheduleStatus) { + return null; + } + + @Override + public boolean deleteSchedule(int scheduleId) { + return false; + } + + @Override + public Map<String, Object> previewSchedule(String schedule) { + return null; + } + + @Override + public IPage<Schedule> page(Schedule savePoint, ScheduleState request) { + return null; + } + + @Override + public Schedule querySchedule(int id) { + return getById(id); + } +} diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml index 5b832f547..0bbca7653 100644 --- a/streampark-console/streampark-console-service/src/main/resources/application.yml +++ b/streampark-console/streampark-console-service/src/main/resources/application.yml @@ -66,6 +66,27 @@ spring: mvc: converters: preferred-json-mapper: jackson + quartz: + auto-startup: false + job-store-type: jdbc + jdbc: + initialize-schema: never + properties: + org.quartz.threadPool.threadPriority: 5 + org.quartz.jobStore.isClustered: true + org.quartz.jobStore.class: org.springframework.scheduling.quartz.LocalDataSourceJobStore + org.quartz.scheduler.instanceId: AUTO + org.quartz.jobStore.tablePrefix: QRTZ_ + org.quartz.jobStore.acquireTriggersWithinLock: true + org.quartz.scheduler.instanceName: StreamPark + org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool + org.quartz.jobStore.useProperties: false + org.quartz.threadPool.makeThreadsDaemons: true + org.quartz.threadPool.threadCount: 25 + org.quartz.jobStore.misfireThreshold: 60000 + org.quartz.scheduler.makeSchedulerThreadDaemon: true + org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate + org.quartz.jobStore.clusterCheckinInterval: 5000 management: endpoints:
