This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch schedule
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 87214c40356380ee15db3cb2cc2e5bde1c105b30
Author: monster <[email protected]>
AuthorDate: Wed May 17 21:31:23 2023 +0800

    [WIP][Feature]Support offline scheduling
---
 .../apache/streampark/common/util/DateUtils.scala  |  34 ++++-
 .../streampark/console/core/entity/Schedule.java   |  71 +++++++++
 .../console/core/enums/ScheduleState.java          |  46 ++++++
 .../console/core/mapper/SchedulerMapper.java       |  24 ++++
 .../console/core/quartz/JobScheduleTask.java       |  83 +++++++++++
 .../console/core/quartz/QuartzExecutors.java       | 159 +++++++++++++++++++++
 .../console/core/quartz/QuartzTaskUtils.java       |  53 +++++++
 .../console/core/quartz/SchedulerApi.java          |  31 ++++
 .../console/core/service/SchedulerService.java     |  44 ++++++
 .../core/service/impl/SchedulerServiceImpl.java    |  71 +++++++++
 .../src/main/resources/application.yml             |  21 +++
 11 files changed, 636 insertions(+), 1 deletion(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
index 8c62eebe1..fe91ee2b4 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/DateUtils.scala
@@ -16,8 +16,10 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.commons.lang3.StringUtils
+
 import java.text.{ParseException, SimpleDateFormat}
-import java.time.{Duration, LocalDateTime}
+import java.time.{Duration, LocalDateTime, ZonedDateTime, ZoneId}
 import java.time.format.DateTimeFormatter
 import java.util._
 import java.util.concurrent.TimeUnit
@@ -220,4 +222,34 @@ object DateUtils {
     DateUtils.getDateFormat(d, format)
   }
 
+  def transformTimezoneDate(
+      date: Date,
+      sourceTimezoneId: String,
+      targetTimezoneId: String): Date = {
+    Option(sourceTimezoneId)
+      .filter(StringUtils.isNotEmpty)
+      .flatMap(_ => Option(targetTimezoneId))
+      .filter(StringUtils.isNotEmpty)
+      .map(
+        _ => {
+          val dateToString = dateToString(date, sourceTimezoneId)
+          val localDateTime =
+            LocalDateTime.parse(dateToString, 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
+          val zonedDateTime =
+            ZonedDateTime.of(localDateTime, 
TimeZone.getTimeZone(targetTimezoneId).toZoneId)
+          Date.from(zonedDateTime.toInstant)
+        })
+      .getOrElse(date)
+  }
+
+  def transformTimezoneDate(date: Date, targetTimezoneId: String): Date = {
+    transformTimezoneDate(date, ZoneId.systemDefault().getId, targetTimezoneId)
+  }
+
+  def getTimezone(timezoneId: String): Option[TimeZone] = {
+    Option(timezoneId)
+      .filter(StringUtils.isNotEmpty)
+      .map(TimeZone.getTimeZone)
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
new file mode 100644
index 000000000..903088f5b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Schedule.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.ScheduleState;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.util.Date;
+
+@TableName("t_flink_schedules")
+@Data
+public class Schedule {
+
+  @TableId(value = "id", type = IdType.AUTO)
+  private int id;
+
+  private Long appId;
+
+  @TableField(exist = false)
+  private String description;
+
+  /** schedule start time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date startTime;
+
+  /** schedule end time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date endTime;
+
+  /**
+   * timezoneId
+   *
+   * <p>see {@link java.util.TimeZone#getTimeZone(String)}
+   */
+  private String timezoneId;
+
+  /** crontab expression */
+  private String crontab;
+
+  /** create time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date createTime;
+
+  /** update time */
+  @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+  private Date updateTime;
+
+  /** release state */
+  private ScheduleState scheduleState;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java
new file mode 100644
index 000000000..4ff720673
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ScheduleState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import lombok.Getter;
+
+public enum ScheduleState {
+
+  /** 0 offline 1 online */
+  OFFLINE(0, "offline"),
+  ONLINE(1, "online");
+
+  ScheduleState(int code, String descp) {
+    this.code = code;
+    this.descp = descp;
+  }
+
+  @Getter @EnumValue private final int code;
+  @Getter private final String descp;
+
+  public static ReleaseState getEnum(int value) {
+    for (ReleaseState e : ReleaseState.values()) {
+      if (e.ordinal() == value) {
+        return e;
+      }
+    }
+    // For values out of enum scope
+    return null;
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
new file mode 100644
index 000000000..ac672e345
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/SchedulerMapper.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.Schedule;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface SchedulerMapper extends BaseMapper<Schedule> {}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
new file mode 100644
index 000000000..928b058ae
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/JobScheduleTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ScheduleState;
+import org.apache.streampark.console.core.service.SchedulerService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.springframework.scheduling.quartz.QuartzJobBean;
+
+import java.util.Date;
+
+@Slf4j
+public class JobScheduleTask extends QuartzJobBean {
+
+  private final SchedulerService schedulerService;
+
+  public JobScheduleTask(SchedulerService schedulerService) {
+    this.schedulerService = schedulerService;
+  }
+
+  @Override
+  protected void executeInternal(JobExecutionContext context) {
+    JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+
+    int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);
+
+    Date scheduledFireTime = context.getScheduledFireTime();
+
+    Date fireTime = context.getFireTime();
+
+    log.info(
+        "Scheduled fire time :{}, fire time :{}, process id :{}.",
+        scheduledFireTime,
+        fireTime,
+        scheduleId);
+
+    // query schedule
+    Schedule schedule = schedulerService.querySchedule(scheduleId);
+    if (schedule == null || ScheduleState.OFFLINE == 
schedule.getScheduleState()) {
+      log.warn(
+          "Job schedule does not exist in db or process schedule 
offline,delete schedule job in quartz, scheduleId:{}.",
+          scheduleId);
+      deleteJob(context, scheduleId);
+      return;
+    }
+    // start flink job
+    // .......
+  }
+
+  private void deleteJob(JobExecutionContext context, int scheduleId) {
+    final Scheduler scheduler = context.getScheduler();
+    JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId);
+    try {
+      if (scheduler.checkExists(jobKey)) {
+        log.info("Try to delete job: {}.", scheduleId);
+        scheduler.deleteJob(jobKey);
+      }
+    } catch (Exception e) {
+      log.error("Failed to delete job: {}.", jobKey);
+    }
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
new file mode 100644
index 000000000..2ba57af09
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzExecutors.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.common.util.DateUtils;
+import org.apache.streampark.console.core.entity.Schedule;
+
+import com.google.common.base.Strings;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
+
+@Slf4j
+public class QuartzExecutors implements SchedulerApi {
+
+  private final Scheduler scheduler;
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public QuartzExecutors(Scheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  @SneakyThrows
+  @Override
+  public void start() {
+    try {
+      scheduler.start();
+    } catch (Exception e) {
+      throw new SchedulerException("Failed to start quartz scheduler.", e);
+    }
+  }
+
+  @SneakyThrows
+  @Override
+  public void insertOrUpdateScheduleTask(Schedule schedule) {
+    JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId());
+    Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(schedule);
+    String cronExpression = schedule.getCrontab();
+    String timezoneId = schedule.getTimezoneId();
+    Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), 
timezoneId);
+    Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), 
timezoneId);
+
+    lock.writeLock().lock();
+    try {
+
+      JobDetail jobDetail;
+      if (scheduler.checkExists(jobKey)) {
+
+        jobDetail = scheduler.getJobDetail(jobKey);
+        jobDetail.getJobDataMap().putAll(jobDataMap);
+      } else {
+        jobDetail = newJob(JobScheduleTask.class).withIdentity(jobKey).build();
+
+        jobDetail.getJobDataMap().putAll(jobDataMap);
+
+        scheduler.addJob(jobDetail, false, true);
+
+        log.info("Add job, job name: {}, group name: {}.", jobKey.getName(), 
jobKey.getGroup());
+      }
+
+      TriggerKey triggerKey = new TriggerKey(jobKey.getName(), 
jobKey.getGroup());
+
+      CronTrigger cronTrigger =
+          newTrigger()
+              .withIdentity(triggerKey)
+              .startAt(startDate)
+              .endAt(endDate)
+              .withSchedule(
+                  cronSchedule(cronExpression)
+                      .withMisfireHandlingInstructionDoNothing()
+                      .inTimeZone(DateUtils.getTimezone(timezoneId).get()))
+              .forJob(jobDetail)
+              .build();
+
+      if (scheduler.checkExists(triggerKey)) {
+        CronTrigger oldCronTrigger = (CronTrigger) 
scheduler.getTrigger(triggerKey);
+        String oldCronExpression = oldCronTrigger.getCronExpression();
+
+        if (!Strings.nullToEmpty(cronExpression)
+            .equalsIgnoreCase(Strings.nullToEmpty(oldCronExpression))) {
+          scheduler.rescheduleJob(triggerKey, cronTrigger);
+          log.info(
+              "Reschedule job trigger, triggerName: {}, triggerGroupName: {}, 
cronExpression: {}, startDate: {}, endDate: {}",
+              triggerKey.getName(),
+              triggerKey.getGroup(),
+              cronExpression,
+              startDate,
+              endDate);
+        }
+      } else {
+        scheduler.scheduleJob(cronTrigger);
+        log.info(
+            "Schedule job trigger, triggerName: {}, triggerGroupName: {}, 
cronExpression: {}, startDate: {}, endDate: {}",
+            triggerKey.getName(),
+            triggerKey.getGroup(),
+            cronExpression,
+            startDate,
+            endDate);
+      }
+    } catch (Exception e) {
+      throw new SchedulerException("Add schedule job failed.", e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @SneakyThrows
+  @Override
+  public void deleteScheduleTask(int scheduleId) {
+    JobKey jobKey = QuartzTaskUtils.getJobKey(scheduleId);
+    try {
+      if (scheduler.checkExists(jobKey)) {
+        log.info("Try to delete scheduler task, schedulerId: {}.", scheduleId);
+        scheduler.deleteJob(jobKey);
+      }
+    } catch (Exception e) {
+      log.error("Failed to delete scheduler task, schedulerId: {}.", 
scheduleId, e);
+      throw new SchedulerException("Failed to delete scheduler task.");
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      scheduler.shutdown();
+    } catch (org.quartz.SchedulerException e) {
+      throw new SchedulerException("Failed to shutdown scheduler.", e);
+    }
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java
new file mode 100644
index 000000000..a92da37a2
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/QuartzTaskUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.console.core.entity.Schedule;
+
+import org.quartz.JobKey;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class QuartzTaskUtils {
+
+  public static final String QUARTZ_JOB_PREFIX = "job";
+  public static final String QUARTZ_JOB_GROUP_PREFIX = "jobgroup";
+  public static final String UNDERLINE = "_";
+  public static final String SCHEDULE_ID = "scheduleId";
+  public static final String SCHEDULE = "schedule";
+
+  /**
+   * @param schedulerId scheduler id
+   * @return quartz job name
+   */
+  public static JobKey getJobKey(int schedulerId) {
+    String jobName = QUARTZ_JOB_PREFIX + UNDERLINE + schedulerId;
+    String jobGroup = QUARTZ_JOB_GROUP_PREFIX + UNDERLINE;
+    return new JobKey(jobName, jobGroup);
+  }
+
+  public static Map<String, Object> buildDataMap(Schedule schedule) {
+    Map<String, Object> dataMap = new HashMap<>(8);
+    dataMap.put(SCHEDULE_ID, schedule.getId());
+    dataMap.put(SCHEDULE, JsonUtils.Marshal(schedule).toJson());
+
+    return dataMap;
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java
new file mode 100644
index 000000000..d03f0035b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/quartz/SchedulerApi.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.quartz;
+
+import org.apache.streampark.console.core.entity.Schedule;
+
+public interface SchedulerApi extends AutoCloseable {
+
+  void start();
+
+  void insertOrUpdateScheduleTask(Schedule schedule);
+
+  void deleteScheduleTask(int scheduleId);
+
+  void close() throws Exception;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
new file mode 100644
index 000000000..4e5f6583b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SchedulerService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ScheduleState;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.Map;
+
+public interface SchedulerService extends IService<Schedule> {
+
+  boolean insertSchedule(Long appId, String schedule);
+
+  boolean updateSchedule(Long appId, String scheduleExpression);
+
+  Map<String, Object> setScheduleState(Long appId, ReleaseState 
scheduleStatus);
+
+  boolean deleteSchedule(int scheduleId);
+
+  Map<String, Object> previewSchedule(String schedule);
+
+  IPage<Schedule> page(Schedule savePoint, ScheduleState request);
+
+  Schedule querySchedule(int id);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
new file mode 100644
index 000000000..2714eb1eb
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SchedulerServiceImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.console.core.entity.Schedule;
+import org.apache.streampark.console.core.enums.ReleaseState;
+import org.apache.streampark.console.core.enums.ScheduleState;
+import org.apache.streampark.console.core.mapper.SchedulerMapper;
+import org.apache.streampark.console.core.service.SchedulerService;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/** scheduler service impl */
+@Service
+public class SchedulerServiceImpl extends ServiceImpl<SchedulerMapper, 
Schedule>
+    implements SchedulerService {
+
+  @Override
+  public boolean insertSchedule(Long appId, String schedule) {
+    return false;
+  }
+
+  @Override
+  public boolean updateSchedule(Long appId, String scheduleExpression) {
+    return false;
+  }
+
+  @Override
+  public Map<String, Object> setScheduleState(Long appId, ReleaseState 
scheduleStatus) {
+    return null;
+  }
+
+  @Override
+  public boolean deleteSchedule(int scheduleId) {
+    return false;
+  }
+
+  @Override
+  public Map<String, Object> previewSchedule(String schedule) {
+    return null;
+  }
+
+  @Override
+  public IPage<Schedule> page(Schedule savePoint, ScheduleState request) {
+    return null;
+  }
+
+  @Override
+  public Schedule querySchedule(int id) {
+    return getById(id);
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 5b832f547..0bbca7653 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -66,6 +66,27 @@ spring:
   mvc:
     converters:
       preferred-json-mapper: jackson
+  quartz:
+    auto-startup: false
+    job-store-type: jdbc
+    jdbc:
+      initialize-schema: never
+    properties:
+      org.quartz.threadPool.threadPriority: 5
+      org.quartz.jobStore.isClustered: true
+      org.quartz.jobStore.class: 
org.springframework.scheduling.quartz.LocalDataSourceJobStore
+      org.quartz.scheduler.instanceId: AUTO
+      org.quartz.jobStore.tablePrefix: QRTZ_
+      org.quartz.jobStore.acquireTriggersWithinLock: true
+      org.quartz.scheduler.instanceName: StreamPark
+      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
+      org.quartz.jobStore.useProperties: false
+      org.quartz.threadPool.makeThreadsDaemons: true
+      org.quartz.threadPool.threadCount: 25
+      org.quartz.jobStore.misfireThreshold: 60000
+      org.quartz.scheduler.makeSchedulerThreadDaemon: true
+      org.quartz.jobStore.driverDelegateClass: 
org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
+      org.quartz.jobStore.clusterCheckinInterval: 5000
 
 management:
   endpoints:

Reply via email to