This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/dev-offline-sync by this push:
     new 8f75bf6708 [INLONG-10514][Manager] Support built-in schedule engine 
trigger submitting of Flink batch job (#10515)
8f75bf6708 is described below

commit 8f75bf6708b323b8b7612075ca62cd589ac96f80
Author: AloysZhang <[email protected]>
AuthorDate: Thu Jun 27 11:45:21 2024 +0800

    [INLONG-10514][Manager] Support built-in schedule engine trigger submitting 
of Flink batch job (#10515)
---
 .../client/api/inner/client/InlongGroupClient.java |  5 +++
 .../manager/client/api/service/InlongGroupApi.java |  3 ++
 .../manager/common/consts/InlongConstants.java     |  2 +-
 .../plugin/offline/FlinkOfflineJobOperator.java    |  2 +-
 .../inlong/manager/plugin/util/FlinkUtils.java     | 20 ++++++++-
 inlong-manager/manager-schedule/pom.xml            |  5 +++
 .../schedule/quartz/QuartzOfflineSyncJob.java      | 49 ++++++++++++++++++++--
 .../schedule/quartz/QuartzScheduleEngine.java      | 27 +++++++++---
 .../schedule/quartz/QuartzSchedulerListener.java   | 36 +++++++++-------
 .../manager/schedule/util/ScheduleUtils.java       | 22 +++++++++-
 .../manager/schedule/util/ScheduleUtilsTest.java   |  6 +--
 .../schedule/GroupScheduleResourceListener.java    |  4 +-
 .../service/schedule/ScheduleOperatorImpl.java     |  3 +-
 .../src/test/resources/application.properties      |  7 ++++
 14 files changed, 156 insertions(+), 35 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 54b61f91c2..fcc907396e 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -320,4 +320,9 @@ public class InlongGroupClient {
         return response.getData();
     }
 
+    public Boolean submitOfflineJob(String groupId) {
+        Response<Boolean> responseBody = 
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
+        ClientUtils.assertRespSuccess(responseBody);
+        return responseBody.getData();
+    }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 819d9802d9..930a3a8144 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -98,4 +98,7 @@ public interface InlongGroupApi {
 
     @GET("group/switch/finish/{groupId}")
     Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);
+
+    @POST("group/submitOfflineJob/{groupId}")
+    Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index e9a18439d2..4abcc0f359 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,7 +90,7 @@ public class InlongConstants {
     public static final Integer DATASYNC_REALTIME_MODE = 1;
     public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
-    public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+    public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
     public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
 
     public static final Integer DISABLE_ZK = 0;
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index 09740c53f3..1c93e7636a 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -31,6 +31,6 @@ public class FlinkOfflineJobOperator implements 
OfflineJobOperator {
 
     @Override
     public void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception {
-        submitFlinkJobs(groupId, streamInfoList);
+        submitFlinkJobs(groupId, streamInfoList, true);
     }
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 82a0d628c1..fd0c2c8bf2 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -58,6 +58,8 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_BATCH;
+import static 
org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_STREAM;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
 import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
@@ -220,6 +222,11 @@ public class FlinkUtils {
 
     public static ListenerResult submitFlinkJobs(String groupId, 
List<InlongStreamInfo> streamInfoList)
             throws Exception {
+        return submitFlinkJobs(groupId, streamInfoList, false);
+    }
+
+    public static ListenerResult submitFlinkJobs(String groupId, 
List<InlongStreamInfo> streamInfoList,
+            boolean isBatchJob) throws Exception {
         int sinkCount = streamInfoList.stream()
                 .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
                 .reduce(0, Integer::sum);
@@ -232,7 +239,8 @@ public class FlinkUtils {
 
         List<ListenerResult> listenerResults = new ArrayList<>();
         for (InlongStreamInfo streamInfo : streamInfoList) {
-            listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, 
FlinkUtils.genFlinkJobName(streamInfo)));
+            listenerResults.add(
+                    FlinkUtils.submitFlinkJob(streamInfo, 
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
         }
 
         // only one stream in group for now
@@ -246,6 +254,11 @@ public class FlinkUtils {
     }
 
     public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName) throws Exception {
+        return submitFlinkJob(streamInfo, jobName, false);
+    }
+
+    public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName, boolean isBatchJob)
+            throws Exception {
         List<StreamSink> sinkList = streamInfo.getSinkList();
         List<String> sinkTypes = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
         if (CollectionUtils.isEmpty(sinkList) || 
!SinkType.containSortFlinkSink(sinkTypes)) {
@@ -283,6 +296,11 @@ public class FlinkUtils {
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
         
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+        if (isBatchJob) {
+            flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
+        } else {
+            flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
+        }
         FlinkOperation flinkOperation = FlinkOperation.getInstance();
         try {
             flinkOperation.genPath(flinkInfo, dataflow);
diff --git a/inlong-manager/manager-schedule/pom.xml 
b/inlong-manager/manager-schedule/pom.xml
index 8d598b47ca..8b22e39ee4 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>manager-pojo</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>manager-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.quartz-scheduler</groupId>
             <artifactId>quartz</artifactId>
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
index 8ac9d3b6fb..2b66766474 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
@@ -17,24 +17,67 @@
 
 package org.apache.inlong.manager.schedule.quartz;
 
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.quartz.Job;
+import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
+import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
+import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
+@Service
 public class QuartzOfflineSyncJob implements Job {
 
-    private ScheduleInfo scheduleInfo;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzOfflineSyncJob.class);
+
+    private volatile InlongGroupClient groupClient;
 
     @Override
     public void execute(JobExecutionContext context) throws 
JobExecutionException {
-        // TODO: complete the offline sync logic
+        LOGGER.info("QuartzOfflineSyncJob run once");
+        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+        initGroupClientIfNeeded(jobDataMap);
+        String inlongGroupId = context.getJobDetail().getKey().getName();
+        LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
+        if (groupClient.submitOfflineJob(inlongGroupId)) {
+            LOGGER.info("Successfully submitting offline job for group {}", 
inlongGroupId);
+        } else {
+            LOGGER.warn("Failed to submit offline job for group {}", 
inlongGroupId);
+        }
+    }
+
+    private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
+        if (groupClient == null) {
+            String host = (String) jobDataMap.get(MANAGER_HOST);
+            int port = (int) jobDataMap.get(MANAGER_PORT);
+            String secreteId = (String) jobDataMap.get(SECRETE_ID);
+            String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
+            LOGGER.info("Initializing Inlong group client, with host: {}, 
port: {}, userName : {}",
+                    host, port, secreteId);
+            ClientConfiguration configuration = new ClientConfiguration();
+            configuration.setAuthentication(new 
DefaultAuthentication(secreteId, secreteKey));
+            String serviceUrl = host + ":" + port;
+            InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, 
configuration);
+            ClientFactory clientFactory = 
ClientUtils.getClientFactory(inlongClient.getConfiguration());
+            groupClient = clientFactory.getGroupClient();
+        }
     }
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
index e8bb1085ce..58f6e7c176 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
@@ -32,6 +32,7 @@ import org.quartz.Trigger;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.util.HashSet;
@@ -50,6 +51,18 @@ public class QuartzScheduleEngine implements ScheduleEngine {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzScheduleEngine.class);
 
+    @Value("${server.host:127.0.0.1}")
+    private String host;
+
+    @Value("${server.port:8083}")
+    private int port;
+
+    @Value("${inlong.inner.secrete.id:admin}")
+    private String secretId;
+
+    @Value("${inlong.inner.secrete.key:87haw3VYTPqK5fK0}")
+    private String secretKey;
+
     private final Scheduler scheduler;
     private final Set<String> scheduledJobSet = new HashSet<>();
 
@@ -60,6 +73,7 @@ public class QuartzScheduleEngine implements ScheduleEngine {
         } catch (SchedulerException e) {
             throw new QuartzScheduleException("Failed to init quartz scheduler 
", e);
         }
+        start();
     }
 
     @Override
@@ -68,7 +82,8 @@ public class QuartzScheduleEngine implements ScheduleEngine {
             // add listener
             scheduler.getListenerManager().addSchedulerListener(new 
QuartzSchedulerListener(this));
             scheduler.start();
-            LOGGER.info("Quartz scheduler engine started");
+            LOGGER.info("Quartz scheduler engine started, inlong manager host 
{}, port {}, secretId {}",
+                    host, port, secretId);
         } catch (SchedulerException e) {
             throw new QuartzScheduleException("Failed to start quartz 
scheduler ", e);
         }
@@ -79,7 +94,7 @@ public class QuartzScheduleEngine implements ScheduleEngine {
      * */
     public boolean triggerFinalized(Trigger trigger) {
         String jobName = trigger.getJobKey().getName();
-        LOGGER.info("Trigger finalized for job {}", jobName);
+        LOGGER.info("Quartz trigger finalized for job {}", jobName);
         return scheduledJobSet.remove(jobName);
     }
 
@@ -97,12 +112,12 @@ public class QuartzScheduleEngine implements 
ScheduleEngine {
         if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
             throw new QuartzScheduleException("Group " + 
scheduleInfo.getInlongGroupId() + " is already registered");
         }
-        JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz);
+        JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz, host, 
port, secretId, secretKey);
         Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
         try {
             scheduler.scheduleJob(jobDetail, trigger);
             scheduledJobSet.add(scheduleInfo.getInlongGroupId());
-            LOGGER.info("Registered new schedule info for {}", 
scheduleInfo.getInlongGroupId());
+            LOGGER.info("Registered new quartz schedule info for {}", 
scheduleInfo.getInlongGroupId());
         } catch (SchedulerException e) {
             throw new QuartzScheduleException(e.getMessage());
         }
@@ -123,7 +138,7 @@ public class QuartzScheduleEngine implements ScheduleEngine 
{
             }
         }
         scheduledJobSet.remove(groupId);
-        LOGGER.info("Un-registered schedule info for {}", groupId);
+        LOGGER.info("Un-registered quartz schedule info for {}", groupId);
         return true;
     }
 
@@ -140,7 +155,7 @@ public class QuartzScheduleEngine implements ScheduleEngine 
{
     public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends 
Job> clz) {
         handleUnregister(scheduleInfo.getInlongGroupId());
         handleRegister(scheduleInfo, clz);
-        LOGGER.info("Updated schedule info for {}", 
scheduleInfo.getInlongGroupId());
+        LOGGER.info("Updated quartz schedule info for {}", 
scheduleInfo.getInlongGroupId());
         return false;
     }
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
index 49a9583eaf..6d43b54f9c 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
@@ -23,13 +23,17 @@ import org.quartz.SchedulerException;
 import org.quartz.SchedulerListener;
 import org.quartz.Trigger;
 import org.quartz.TriggerKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation for quartz scheduler listener.
  * */
 public class QuartzSchedulerListener implements SchedulerListener {
 
-    QuartzScheduleEngine quartzScheduleEngine;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzSchedulerListener.class);
+
+    private QuartzScheduleEngine quartzScheduleEngine;
 
     public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) {
         this.quartzScheduleEngine = quartzScheduleEngine;
@@ -37,22 +41,24 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void jobScheduled(Trigger trigger) {
-
+        LOGGER.info("Quartz job with key {} scheduled", 
trigger.getKey().getName());
     }
 
     @Override
     public void jobUnscheduled(TriggerKey triggerKey) {
-
+        LOGGER.info("Quartz job with key {} un-scheduled", 
triggerKey.getName());
     }
 
     @Override
     public void triggerFinalized(Trigger trigger) {
         quartzScheduleEngine.triggerFinalized(trigger);
+        LOGGER.info("Quartz trigger with key {} startTime {} ande endTime {} 
is finalized",
+                trigger.getKey().getName(), trigger.getStartTime(), 
trigger.getEndTime());
     }
 
     @Override
     public void triggerPaused(TriggerKey triggerKey) {
-
+        LOGGER.info("Quartz trigger with key {} paused", triggerKey.getName());
     }
 
     @Override
@@ -62,7 +68,7 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void triggerResumed(TriggerKey triggerKey) {
-
+        LOGGER.info("Quartz trigger with key {} Resume", triggerKey.getName());
     }
 
     @Override
@@ -72,17 +78,17 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void jobAdded(JobDetail jobDetail) {
-
+        LOGGER.info("New quartz job added, name {}", 
jobDetail.getKey().getName());
     }
 
     @Override
     public void jobDeleted(JobKey jobKey) {
-
+        LOGGER.info("Quartz job deleted, name {}", jobKey.getName());
     }
 
     @Override
     public void jobPaused(JobKey jobKey) {
-
+        LOGGER.info("Quartz job paused, name {}", jobKey.getName());
     }
 
     @Override
@@ -92,7 +98,7 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void jobResumed(JobKey jobKey) {
-
+        LOGGER.info("Quartz job resumed, name {}", jobKey.getName());
     }
 
     @Override
@@ -102,7 +108,7 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void schedulerError(String msg, SchedulerException cause) {
-
+        LOGGER.warn("Quartz schedule exception, errorMsg {}", msg, cause);
     }
 
     @Override
@@ -112,26 +118,26 @@ public class QuartzSchedulerListener implements 
SchedulerListener {
 
     @Override
     public void schedulerStarted() {
-
+        LOGGER.warn("Quartz scheduler started");
     }
 
     @Override
     public void schedulerStarting() {
-
+        LOGGER.warn("Quartz scheduler starting");
     }
 
     @Override
     public void schedulerShutdown() {
-
+        LOGGER.warn("Quartz scheduler shutdown");
     }
 
     @Override
     public void schedulerShuttingdown() {
-
+        LOGGER.warn("Quartz scheduler shutting down");
     }
 
     @Override
     public void schedulingDataCleared() {
-
+        LOGGER.warn("Quartz scheduler data cleared");
     }
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 1e4f43983e..2cd2a80b39 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -33,6 +33,8 @@ import org.quartz.SimpleScheduleBuilder;
 import org.quartz.SimpleTrigger;
 import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.Timestamp;
 import java.util.Date;
@@ -42,9 +44,21 @@ import java.util.Date;
  * */
 public class ScheduleUtils {
 
-    public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends Job> clz) {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ScheduleUtils.class);
+
+    public static final String MANAGER_HOST = "HOST";
+    public static final String MANAGER_PORT = "PORT";
+    public static final String SECRETE_ID = "SECRETE_ID";
+    public static final String SECRETE_KEY = "SECRETE_KEY";
+
+    public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends Job> clz,
+            String host, Integer port, String secreteId, String secreteKey) {
         return JobBuilder.newJob(clz)
                 .withIdentity(scheduleInfo.getInlongGroupId())
+                .usingJobData(MANAGER_HOST, host)
+                .usingJobData(MANAGER_PORT, port)
+                .usingJobData(SECRETE_ID, secreteId)
+                .usingJobData(SECRETE_KEY, secreteKey)
                 .build();
     }
 
@@ -57,6 +71,12 @@ public class ScheduleUtils {
         if (type == null) {
             throw new QuartzScheduleException("Invalid schedule type: " + 
scheduleType);
         }
+        LOGGER.info("Creating quartz trigger for key : {}, startTime : {}, 
endTime : {}, scheduleTYpe : {}, "
+                + "scheduleUnit : {}, scheduleInterval : {}, crontabExpression 
: {}",
+                key, startTime, endTime, type.name(),
+                scheduleInfo.getScheduleUnit(),
+                scheduleInfo.getScheduleInterval(),
+                scheduleInfo.getCrontabExpression());
         switch (type) {
             case NORMAL:
                 return TriggerBuilder.newTrigger()
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
index da8fd66c61..b515b7ae92 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
@@ -102,7 +102,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
     @Test
     public void testGenJobDetail() {
         ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
-        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class, null, null, null, null);
         assertNotNull(jobDetail);
 
         JobKey jobKey = jobDetail.getKey();
@@ -116,7 +116,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
     public void testGenCronTrigger() {
         // normal
         ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
-        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
+        JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class, null, null, null, null);
 
         Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail, 
scheduleInfo);
         assertNotNull(trigger);
@@ -139,7 +139,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
 
         // cron
         scheduleInfo = genDefaultCronScheduleInfo();
-        jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class);
+        jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo, 
MockJob.class, null, null, null, null);
 
         trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
         assertNotNull(trigger);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
index 39d3a2a3bf..8a8848d044 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
@@ -71,7 +71,7 @@ public class GroupScheduleResourceListener implements 
ScheduleOperateListener {
         GroupResourceProcessForm form = (GroupResourceProcessForm) 
context.getProcessForm();
         InlongGroupInfo groupInfo = form.getGroupInfo();
         final String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to register schedule info for groupId={}", groupId);
+        log.info("begin to process schedule resource for groupId={}", groupId);
 
         // handle schedule info after group approved
         scheduleOperator.handleGroupApprove(groupId);
@@ -79,7 +79,7 @@ public class GroupScheduleResourceListener implements 
ScheduleOperateListener {
         // after register schedule info successfully, add ext property to 
group ext info
         saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
                 InlongConstants.REGISTERED, groupInfo.getExtList());
-        log.info("success to register schedule info for group={}", groupId);
+        log.info("success to process schedule resource for group={}", groupId);
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index a20fa0cb10..268109e1a2 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -172,10 +172,9 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
             LOGGER.warn("schedule info not exist for group {}", groupId);
             return false;
         }
-        ScheduleInfo scheduleInfo = getScheduleInfo(groupId);
         // change schedule state to approved
         scheduleService.updateStatus(groupId, APPROVED, null);
-        return registerToScheduleEngine(scheduleInfo, null, false);
+        return registerToScheduleEngine(getScheduleInfo(groupId), null, false);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-web/src/test/resources/application.properties 
b/inlong-manager/manager-web/src/test/resources/application.properties
index ad0cbd182d..014b579fd3 100644
--- a/inlong-manager/manager-web/src/test/resources/application.properties
+++ b/inlong-manager/manager-web/src/test/resources/application.properties
@@ -60,3 +60,10 @@ inlong.encrypt.key.value1="I!N@L#O$N%G^"
 
 # clients (e.g. agent and dataproxy) must be authenticated by secretId and 
secretKey if turned on
 openapi.auth.enabled=false
+
+# the secreteId and secreteKey for inlong sub-system communication
+# used for offline job schedule now:
+# 1. when register schedule info, secreteId and secreteKey will be registered 
to schedule engine
+# and the schedule instance will call back to submit offline job with 
secreteId and secreteKey
+inlong.inner.secrete.id=admin
+inlong.inner.secrete.key=87haw3VYTPqK5fK0
\ No newline at end of file

Reply via email to