This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new 8f75bf6708 [INLONG-10514][Manager] Support built-in schedule engine
trigger submitting of Flink batch job (#10515)
8f75bf6708 is described below
commit 8f75bf6708b323b8b7612075ca62cd589ac96f80
Author: AloysZhang <[email protected]>
AuthorDate: Thu Jun 27 11:45:21 2024 +0800
[INLONG-10514][Manager] Support built-in schedule engine trigger submitting
of Flink batch job (#10515)
---
.../client/api/inner/client/InlongGroupClient.java | 5 +++
.../manager/client/api/service/InlongGroupApi.java | 3 ++
.../manager/common/consts/InlongConstants.java | 2 +-
.../plugin/offline/FlinkOfflineJobOperator.java | 2 +-
.../inlong/manager/plugin/util/FlinkUtils.java | 20 ++++++++-
inlong-manager/manager-schedule/pom.xml | 5 +++
.../schedule/quartz/QuartzOfflineSyncJob.java | 49 ++++++++++++++++++++--
.../schedule/quartz/QuartzScheduleEngine.java | 27 +++++++++---
.../schedule/quartz/QuartzSchedulerListener.java | 36 +++++++++-------
.../manager/schedule/util/ScheduleUtils.java | 22 +++++++++-
.../manager/schedule/util/ScheduleUtilsTest.java | 6 +--
.../schedule/GroupScheduleResourceListener.java | 4 +-
.../service/schedule/ScheduleOperatorImpl.java | 3 +-
.../src/test/resources/application.properties | 7 ++++
14 files changed, 156 insertions(+), 35 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index 54b61f91c2..fcc907396e 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -320,4 +320,9 @@ public class InlongGroupClient {
return response.getData();
}
+ public Boolean submitOfflineJob(String groupId) {
+ Response<Boolean> responseBody =
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
+ ClientUtils.assertRespSuccess(responseBody);
+ return responseBody.getData();
+ }
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 819d9802d9..930a3a8144 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -98,4 +98,7 @@ public interface InlongGroupApi {
@GET("group/switch/finish/{groupId}")
Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);
+
+ @POST("group/submitOfflineJob/{groupId}")
+ Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index e9a18439d2..4abcc0f359 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,7 +90,7 @@ public class InlongConstants {
public static final Integer DATASYNC_REALTIME_MODE = 1;
public static final Integer DATASYNC_OFFLINE_MODE = 2;
- public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+ public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
public static final Integer DISABLE_ZK = 0;
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index 09740c53f3..1c93e7636a 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -31,6 +31,6 @@ public class FlinkOfflineJobOperator implements
OfflineJobOperator {
@Override
public void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception {
- submitFlinkJobs(groupId, streamInfoList);
+ submitFlinkJobs(groupId, streamInfoList, true);
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index 82a0d628c1..fd0c2c8bf2 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -58,6 +58,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_BATCH;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.RUNTIME_EXECUTION_MODE_STREAM;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
import static
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
@@ -220,6 +222,11 @@ public class FlinkUtils {
public static ListenerResult submitFlinkJobs(String groupId,
List<InlongStreamInfo> streamInfoList)
throws Exception {
+ return submitFlinkJobs(groupId, streamInfoList, false);
+ }
+
+ public static ListenerResult submitFlinkJobs(String groupId,
List<InlongStreamInfo> streamInfoList,
+ boolean isBatchJob) throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
@@ -232,7 +239,8 @@ public class FlinkUtils {
List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
- listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(streamInfo)));
+ listenerResults.add(
+ FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
}
// only one stream in group for now
@@ -246,6 +254,11 @@ public class FlinkUtils {
}
public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName) throws Exception {
+ return submitFlinkJob(streamInfo, jobName, false);
+ }
+
+ public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName, boolean isBatchJob)
+ throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sinkList) ||
!SinkType.containSortFlinkSink(sinkTypes)) {
@@ -283,6 +296,11 @@ public class FlinkUtils {
String sortUrl = kvConf.get(InlongConstants.SORT_URL);
flinkInfo.setEndpoint(sortUrl);
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+ if (isBatchJob) {
+ flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
+ } else {
+ flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
+ }
FlinkOperation flinkOperation = FlinkOperation.getInstance();
try {
flinkOperation.genPath(flinkInfo, dataflow);
diff --git a/inlong-manager/manager-schedule/pom.xml
b/inlong-manager/manager-schedule/pom.xml
index 8d598b47ca..8b22e39ee4 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -38,6 +38,11 @@
<artifactId>manager-pojo</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>manager-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
index 8ac9d3b6fb..2b66766474 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
@@ -17,24 +17,67 @@
package org.apache.inlong.manager.schedule.quartz;
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.quartz.Job;
+import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
+import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
+import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;
@Data
@NoArgsConstructor
@AllArgsConstructor
+@Service
public class QuartzOfflineSyncJob implements Job {
- private ScheduleInfo scheduleInfo;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QuartzOfflineSyncJob.class);
+
+ private volatile InlongGroupClient groupClient;
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
- // TODO: complete the offline sync logic
+ LOGGER.info("QuartzOfflineSyncJob run once");
+ JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+ initGroupClientIfNeeded(jobDataMap);
+ String inlongGroupId = context.getJobDetail().getKey().getName();
+ LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
+ if (groupClient.submitOfflineJob(inlongGroupId)) {
+ LOGGER.info("Successfully submitting offline job for group {}",
inlongGroupId);
+ } else {
+ LOGGER.warn("Failed to submit offline job for group {}",
inlongGroupId);
+ }
+ }
+
+ private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
+ if (groupClient == null) {
+ String host = (String) jobDataMap.get(MANAGER_HOST);
+ int port = (int) jobDataMap.get(MANAGER_PORT);
+ String secreteId = (String) jobDataMap.get(SECRETE_ID);
+ String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
+ LOGGER.info("Initializing Inlong group client, with host: {},
port: {}, userName : {}",
+ host, port, secreteId);
+ ClientConfiguration configuration = new ClientConfiguration();
+ configuration.setAuthentication(new
DefaultAuthentication(secreteId, secreteKey));
+ String serviceUrl = host + ":" + port;
+ InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl,
configuration);
+ ClientFactory clientFactory =
ClientUtils.getClientFactory(inlongClient.getConfiguration());
+ groupClient = clientFactory.getGroupClient();
+ }
}
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
index e8bb1085ce..58f6e7c176 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
@@ -32,6 +32,7 @@ import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashSet;
@@ -50,6 +51,18 @@ public class QuartzScheduleEngine implements ScheduleEngine {
private static final Logger LOGGER =
LoggerFactory.getLogger(QuartzScheduleEngine.class);
+ @Value("${server.host:127.0.0.1}")
+ private String host;
+
+ @Value("${server.port:8083}")
+ private int port;
+
+ @Value("${inlong.inner.secrete.id:admin}")
+ private String secretId;
+
+ @Value("${inlong.inner.secrete.key:87haw3VYTPqK5fK0}")
+ private String secretKey;
+
private final Scheduler scheduler;
private final Set<String> scheduledJobSet = new HashSet<>();
@@ -60,6 +73,7 @@ public class QuartzScheduleEngine implements ScheduleEngine {
} catch (SchedulerException e) {
throw new QuartzScheduleException("Failed to init quartz scheduler
", e);
}
+ start();
}
@Override
@@ -68,7 +82,8 @@ public class QuartzScheduleEngine implements ScheduleEngine {
// add listener
scheduler.getListenerManager().addSchedulerListener(new
QuartzSchedulerListener(this));
scheduler.start();
- LOGGER.info("Quartz scheduler engine started");
+ LOGGER.info("Quartz scheduler engine started, inlong manager host
{}, port {}, secretId {}",
+ host, port, secretId);
} catch (SchedulerException e) {
throw new QuartzScheduleException("Failed to start quartz
scheduler ", e);
}
@@ -79,7 +94,7 @@ public class QuartzScheduleEngine implements ScheduleEngine {
* */
public boolean triggerFinalized(Trigger trigger) {
String jobName = trigger.getJobKey().getName();
- LOGGER.info("Trigger finalized for job {}", jobName);
+ LOGGER.info("Quartz trigger finalized for job {}", jobName);
return scheduledJobSet.remove(jobName);
}
@@ -97,12 +112,12 @@ public class QuartzScheduleEngine implements
ScheduleEngine {
if (scheduledJobSet.contains(scheduleInfo.getInlongGroupId())) {
throw new QuartzScheduleException("Group " +
scheduleInfo.getInlongGroupId() + " is already registered");
}
- JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz);
+ JobDetail jobDetail = genQuartzJobDetail(scheduleInfo, clz, host,
port, secretId, secretKey);
Trigger trigger = genQuartzTrigger(jobDetail, scheduleInfo);
try {
scheduler.scheduleJob(jobDetail, trigger);
scheduledJobSet.add(scheduleInfo.getInlongGroupId());
- LOGGER.info("Registered new schedule info for {}",
scheduleInfo.getInlongGroupId());
+ LOGGER.info("Registered new quartz schedule info for {}",
scheduleInfo.getInlongGroupId());
} catch (SchedulerException e) {
throw new QuartzScheduleException(e.getMessage());
}
@@ -123,7 +138,7 @@ public class QuartzScheduleEngine implements ScheduleEngine
{
}
}
scheduledJobSet.remove(groupId);
- LOGGER.info("Un-registered schedule info for {}", groupId);
+ LOGGER.info("Un-registered quartz schedule info for {}", groupId);
return true;
}
@@ -140,7 +155,7 @@ public class QuartzScheduleEngine implements ScheduleEngine
{
public boolean handleUpdate(ScheduleInfo scheduleInfo, Class<? extends
Job> clz) {
handleUnregister(scheduleInfo.getInlongGroupId());
handleRegister(scheduleInfo, clz);
- LOGGER.info("Updated schedule info for {}",
scheduleInfo.getInlongGroupId());
+ LOGGER.info("Updated quartz schedule info for {}",
scheduleInfo.getInlongGroupId());
return false;
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
index 49a9583eaf..6d43b54f9c 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
@@ -23,13 +23,17 @@ import org.quartz.SchedulerException;
import org.quartz.SchedulerListener;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation for quartz scheduler listener.
* */
public class QuartzSchedulerListener implements SchedulerListener {
- QuartzScheduleEngine quartzScheduleEngine;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QuartzSchedulerListener.class);
+
+ private QuartzScheduleEngine quartzScheduleEngine;
public QuartzSchedulerListener(QuartzScheduleEngine quartzScheduleEngine) {
this.quartzScheduleEngine = quartzScheduleEngine;
@@ -37,22 +41,24 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void jobScheduled(Trigger trigger) {
-
+ LOGGER.info("Quartz job with key {} scheduled",
trigger.getKey().getName());
}
@Override
public void jobUnscheduled(TriggerKey triggerKey) {
-
+ LOGGER.info("Quartz job with key {} un-scheduled",
triggerKey.getName());
}
@Override
public void triggerFinalized(Trigger trigger) {
quartzScheduleEngine.triggerFinalized(trigger);
+ LOGGER.info("Quartz trigger with key {} startTime {} ande endTime {}
is finalized",
+ trigger.getKey().getName(), trigger.getStartTime(),
trigger.getEndTime());
}
@Override
public void triggerPaused(TriggerKey triggerKey) {
-
+ LOGGER.info("Quartz trigger with key {} paused", triggerKey.getName());
}
@Override
@@ -62,7 +68,7 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void triggerResumed(TriggerKey triggerKey) {
-
+ LOGGER.info("Quartz trigger with key {} Resume", triggerKey.getName());
}
@Override
@@ -72,17 +78,17 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void jobAdded(JobDetail jobDetail) {
-
+ LOGGER.info("New quartz job added, name {}",
jobDetail.getKey().getName());
}
@Override
public void jobDeleted(JobKey jobKey) {
-
+ LOGGER.info("Quartz job deleted, name {}", jobKey.getName());
}
@Override
public void jobPaused(JobKey jobKey) {
-
+ LOGGER.info("Quartz job paused, name {}", jobKey.getName());
}
@Override
@@ -92,7 +98,7 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void jobResumed(JobKey jobKey) {
-
+ LOGGER.info("Quartz job resumed, name {}", jobKey.getName());
}
@Override
@@ -102,7 +108,7 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void schedulerError(String msg, SchedulerException cause) {
-
+ LOGGER.warn("Quartz schedule exception, errorMsg {}", msg, cause);
}
@Override
@@ -112,26 +118,26 @@ public class QuartzSchedulerListener implements
SchedulerListener {
@Override
public void schedulerStarted() {
-
+ LOGGER.warn("Quartz scheduler started");
}
@Override
public void schedulerStarting() {
-
+ LOGGER.warn("Quartz scheduler starting");
}
@Override
public void schedulerShutdown() {
-
+ LOGGER.warn("Quartz scheduler shutdown");
}
@Override
public void schedulerShuttingdown() {
-
+ LOGGER.warn("Quartz scheduler shutting down");
}
@Override
public void schedulingDataCleared() {
-
+ LOGGER.warn("Quartz scheduler data cleared");
}
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 1e4f43983e..2cd2a80b39 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -33,6 +33,8 @@ import org.quartz.SimpleScheduleBuilder;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.util.Date;
@@ -42,9 +44,21 @@ import java.util.Date;
* */
public class ScheduleUtils {
- public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo,
Class<? extends Job> clz) {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ScheduleUtils.class);
+
+ public static final String MANAGER_HOST = "HOST";
+ public static final String MANAGER_PORT = "PORT";
+ public static final String SECRETE_ID = "SECRETE_ID";
+ public static final String SECRETE_KEY = "SECRETE_KEY";
+
+ public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo,
Class<? extends Job> clz,
+ String host, Integer port, String secreteId, String secreteKey) {
return JobBuilder.newJob(clz)
.withIdentity(scheduleInfo.getInlongGroupId())
+ .usingJobData(MANAGER_HOST, host)
+ .usingJobData(MANAGER_PORT, port)
+ .usingJobData(SECRETE_ID, secreteId)
+ .usingJobData(SECRETE_KEY, secreteKey)
.build();
}
@@ -57,6 +71,12 @@ public class ScheduleUtils {
if (type == null) {
throw new QuartzScheduleException("Invalid schedule type: " +
scheduleType);
}
+ LOGGER.info("Creating quartz trigger for key : {}, startTime : {},
endTime : {}, scheduleTYpe : {}, "
+ + "scheduleUnit : {}, scheduleInterval : {}, crontabExpression
: {}",
+ key, startTime, endTime, type.name(),
+ scheduleInfo.getScheduleUnit(),
+ scheduleInfo.getScheduleInterval(),
+ scheduleInfo.getCrontabExpression());
switch (type) {
case NORMAL:
return TriggerBuilder.newTrigger()
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
index da8fd66c61..b515b7ae92 100644
---
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
@@ -102,7 +102,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
@Test
public void testGenJobDetail() {
ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
- JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class);
+ JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class, null, null, null, null);
assertNotNull(jobDetail);
JobKey jobKey = jobDetail.getKey();
@@ -116,7 +116,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
public void testGenCronTrigger() {
// normal
ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
- JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class);
+ JobDetail jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class, null, null, null, null);
Trigger trigger = ScheduleUtils.genQuartzTrigger(jobDetail,
scheduleInfo);
assertNotNull(trigger);
@@ -139,7 +139,7 @@ public class ScheduleUtilsTest extends BaseScheduleTest {
// cron
scheduleInfo = genDefaultCronScheduleInfo();
- jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class);
+ jobDetail = ScheduleUtils.genQuartzJobDetail(scheduleInfo,
MockJob.class, null, null, null, null);
trigger = ScheduleUtils.genQuartzTrigger(jobDetail, scheduleInfo);
assertNotNull(trigger);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
index 39d3a2a3bf..8a8848d044 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
@@ -71,7 +71,7 @@ public class GroupScheduleResourceListener implements
ScheduleOperateListener {
GroupResourceProcessForm form = (GroupResourceProcessForm)
context.getProcessForm();
InlongGroupInfo groupInfo = form.getGroupInfo();
final String groupId = groupInfo.getInlongGroupId();
- log.info("begin to register schedule info for groupId={}", groupId);
+ log.info("begin to process schedule resource for groupId={}", groupId);
// handle schedule info after group approved
scheduleOperator.handleGroupApprove(groupId);
@@ -79,7 +79,7 @@ public class GroupScheduleResourceListener implements
ScheduleOperateListener {
// after register schedule info successfully, add ext property to
group ext info
saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
InlongConstants.REGISTERED, groupInfo.getExtList());
- log.info("success to register schedule info for group={}", groupId);
+ log.info("success to process schedule resource for group={}", groupId);
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index a20fa0cb10..268109e1a2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -172,10 +172,9 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
LOGGER.warn("schedule info not exist for group {}", groupId);
return false;
}
- ScheduleInfo scheduleInfo = getScheduleInfo(groupId);
// change schedule state to approved
scheduleService.updateStatus(groupId, APPROVED, null);
- return registerToScheduleEngine(scheduleInfo, null, false);
+ return registerToScheduleEngine(getScheduleInfo(groupId), null, false);
}
@Override
diff --git
a/inlong-manager/manager-web/src/test/resources/application.properties
b/inlong-manager/manager-web/src/test/resources/application.properties
index ad0cbd182d..014b579fd3 100644
--- a/inlong-manager/manager-web/src/test/resources/application.properties
+++ b/inlong-manager/manager-web/src/test/resources/application.properties
@@ -60,3 +60,10 @@ inlong.encrypt.key.value1="I!N@L#O$N%G^"
# clients (e.g. agent and dataproxy) must be authenticated by secretId and
secretKey if turned on
openapi.auth.enabled=false
+
+# the secreteId and secreteKey for inlong sub-system communication
+# used for offline job schedule now:
+# 1. when register schedule info, secreteId and secreteKey will be registered
to schedule engine
+# and the schedule instance will call back to submit offline job with
secreteId and secreteKey
+inlong.inner.secrete.id=admin
+inlong.inner.secrete.key=87haw3VYTPqK5fK0
\ No newline at end of file