This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 042e568624 [INLONG-11401][Manager] Support Dolphinscheduler schedule
engine (#11468)
042e568624 is described below
commit 042e56862440422ca0e6f045996908ed6a54a8ba
Author: emptyOVO <[email protected]>
AuthorDate: Wed Nov 20 10:27:58 2024 +0800
[INLONG-11401][Manager] Support Dolphinscheduler schedule engine (#11468)
---
.../schedule/dolphinschedule/DSTaskDefinition.java | 120 ++++
.../schedule/dolphinschedule/DSTaskParams.java | 46 ++
.../schedule/dolphinschedule/DSTaskRelation.java | 59 ++
.../schedule/dolphinschedule/DScheduleInfo.java} | 32 +-
inlong-manager/manager-schedule/pom.xml | 30 +
.../manager/schedule/ScheduleEngineType.java | 3 +-
.../dolphinscheduler/DolphinScheduleClient.java | 58 ++
.../dolphinscheduler/DolphinScheduleConstants.java | 76 ++
.../dolphinscheduler/DolphinScheduleEngine.java | 268 +++++++
.../dolphinscheduler/DolphinScheduleOperator.java | 173 +++++
.../dolphinscheduler/DolphinScheduleUtils.java | 790 +++++++++++++++++++++
.../exception/DolphinScheduleException.java | 105 +++
.../DolphinScheduleContainerTestEnv.java | 170 +++++
.../DolphinScheduleEngineTest.java | 127 ++++
.../DolphinSchedulerContainerEnvConstants.java | 51 ++
.../src/main/resources/application-dev.properties | 10 +-
.../src/main/resources/application-prod.properties | 11 +-
.../src/main/resources/application-test.properties | 11 +-
18 files changed, 2114 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
new file mode 100644
index 0000000000..700638b63c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class DSTaskDefinition {
+
+ @ApiModelProperty("DolphinScheduler task definition code")
+ @JsonProperty("code")
+ private long code;
+
+ @ApiModelProperty("DolphinScheduler task definition code")
+ @JsonProperty("delayTime")
+ private String delayTime;
+
+ @ApiModelProperty("DolphinScheduler task definition description")
+ @JsonProperty("description")
+ private String description;
+
+ @ApiModelProperty("DolphinScheduler task definition environment code")
+ @JsonProperty("environmentCode")
+ private int environmentCode;
+
+ @ApiModelProperty("DolphinScheduler task fail retry interval")
+ @JsonProperty("failRetryInterval")
+ private String failRetryInterval;
+
+ @ApiModelProperty("DolphinScheduler task definition fail retry times")
+ @JsonProperty("failRetryTimes")
+ private String failRetryTimes;
+
+ @ApiModelProperty("DolphinScheduler task definition flag")
+ @JsonProperty("flag")
+ private String flag;
+
+ @ApiModelProperty("DolphinScheduler task definition isCache")
+ @JsonProperty("isCache")
+ private String isCache;
+
+ @ApiModelProperty("DolphinScheduler task definition name")
+ @JsonProperty("name")
+ private String name;
+
+ @ApiModelProperty("DolphinScheduler task definition params")
+ @JsonProperty("taskParams")
+ private DSTaskParams taskParams;
+
+ @ApiModelProperty("DolphinScheduler task definition priority")
+ @JsonProperty("taskPriority")
+ private String taskPriority;
+
+ @ApiModelProperty("DolphinScheduler task definition type")
+ @JsonProperty("taskType")
+ private String taskType;
+
+ @ApiModelProperty("DolphinScheduler task definition timeout")
+ @JsonProperty("timeout")
+ private int timeout;
+
+ @ApiModelProperty("DolphinScheduler task definition timeout flag")
+ @JsonProperty("timeoutFlag")
+ private String timeoutFlag;
+
+ @ApiModelProperty("DolphinScheduler task definition timeout notify
strategy")
+ @JsonProperty("timeoutNotifyStrategy")
+ private String timeoutNotifyStrategy;
+
+ @ApiModelProperty("DolphinScheduler task definition worker group")
+ @JsonProperty("workerGroup")
+ private String workerGroup;
+
+ @ApiModelProperty("DolphinScheduler task definition apu quota")
+ @JsonProperty("cpuQuota")
+ private int cpuQuota;
+
+ @ApiModelProperty("DolphinScheduler task definition memory max")
+ @JsonProperty("memoryMax")
+ private int memoryMax;
+
+ @ApiModelProperty("DolphinScheduler task definition execute type")
+ @JsonProperty("taskExecuteType")
+ private String taskExecuteType;
+
+ public DSTaskDefinition() {
+ this.delayTime = "0";
+ this.description = "";
+ this.environmentCode = -1;
+ this.failRetryInterval = "1";
+ this.failRetryTimes = "0";
+ this.flag = "YES";
+ this.isCache = "NO";
+ this.taskPriority = "MEDIUM";
+ this.taskType = "SHELL";
+ this.timeoutFlag = "CLOSE";
+ this.timeoutNotifyStrategy = "";
+ this.workerGroup = "default";
+ this.cpuQuota = -1;
+ this.memoryMax = -1;
+ this.taskExecuteType = "BATCH";
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
new file mode 100644
index 0000000000..a5344f5fac
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+@Data
+public class DSTaskParams {
+
+ @ApiModelProperty("DolphinScheduler task params local params")
+ @JsonProperty("localParams")
+ private List<Object> localParams;
+
+ @ApiModelProperty("DolphinScheduler task params raw script")
+ @JsonProperty("rawScript")
+ private String rawScript;
+
+ @ApiModelProperty("DolphinScheduler task params resource list")
+ @JsonProperty("resourceList")
+ private List<Object> resourceList;
+
+ public DSTaskParams() {
+ this.localParams = new ArrayList<>();
+ this.resourceList = new ArrayList<>();
+ this.rawScript = "";
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
new file mode 100644
index 0000000000..e853317df7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+@Data
+public class DSTaskRelation {
+
+ @ApiModelProperty("DolphinScheduler task relation name")
+ @JsonProperty("name")
+ private String name;
+
+ @ApiModelProperty("DolphinScheduler task relation pre-task code")
+ @JsonProperty("preTaskCode")
+ private int preTaskCode;
+
+ @ApiModelProperty("DolphinScheduler task relation pre-task version")
+ @JsonProperty("preTaskVersion")
+ private int preTaskVersion;
+
+ @ApiModelProperty("DolphinScheduler task relation post-task code")
+ @JsonProperty("postTaskCode")
+ private long postTaskCode;
+
+ @ApiModelProperty("DolphinScheduler task relation post-task version")
+ @JsonProperty("postTaskVersion")
+ private int postTaskVersion;
+
+ @ApiModelProperty("DolphinScheduler task relation condition type")
+ @JsonProperty("conditionType")
+ private String conditionType;
+
+ @ApiModelProperty("DolphinScheduler task relation condition params")
+ @JsonProperty("conditionParams")
+ private Object conditionParams;
+
+ public DSTaskRelation() {
+ this.name = "";
+ this.conditionType = "NONE";
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
similarity index 52%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
index 71949ef744..ac45b26f19 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java
@@ -15,19 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
-import lombok.Getter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-@Getter
-public enum ScheduleEngineType {
+@Data
+public class DScheduleInfo {
- NONE("None"),
- QUARTZ("Quartz");
+ @ApiModelProperty("DolphinScheduler schedule start time")
+ @JsonProperty("startTime")
+ private String startTime;
- private final String type;
+ @ApiModelProperty("DolphinScheduler schedule end time")
+ @JsonProperty("endTime")
+ private String endTime;
- ScheduleEngineType(String type) {
- this.type = type;
- }
-}
\ No newline at end of file
+ @ApiModelProperty("DolphinScheduler schedule crontab expression")
+ @JsonProperty("crontab")
+ private String crontab;
+
+ @ApiModelProperty("DolphinScheduler schedule timezone id")
+ @JsonProperty("timezoneId")
+ private String timezoneId;
+
+}
diff --git a/inlong-manager/manager-schedule/pom.xml
b/inlong-manager/manager-schedule/pom.xml
index a9d9fb3e1e..82632a1aff 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -73,5 +73,35 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-test</artifactId>
+ <version>${spring.boot.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.vaadin.external.google</groupId>
+ <artifactId>android-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
index 71949ef744..ac71e4e2d1 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java
@@ -23,7 +23,8 @@ import lombok.Getter;
public enum ScheduleEngineType {
NONE("None"),
- QUARTZ("Quartz");
+ QUARTZ("Quartz"),
+ DOLPHINSCHEDULER("DolphinScheduler");
private final String type;
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
new file mode 100644
index 0000000000..a75085b9ac
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineType;
+
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+
+/**
+ * Built-in implementation of third-party schedule engine client corresponding
with {@link DolphinScheduleEngine}.
+ * DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to
register/unregister/update
+ * schedule info, all the logic for invoking the remote scheduling service is
implemented in {@link DolphinScheduleEngine}
+ */
+@Service
+public class DolphinScheduleClient implements ScheduleEngineClient {
+
+ @Resource
+ public DolphinScheduleEngine scheduleEngine;
+
+ @Override
+ public boolean accept(String engineType) {
+ return
ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType);
+ }
+
+ @Override
+ public boolean register(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleRegister(scheduleInfo);
+ }
+
+ @Override
+ public boolean unregister(String groupId) {
+ return scheduleEngine.handleUnregister(groupId);
+ }
+
+ @Override
+ public boolean update(ScheduleInfo scheduleInfo) {
+ return scheduleEngine.handleUpdate(scheduleInfo);
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
new file mode 100644
index 0000000000..89dcda5b77
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+public class DolphinScheduleConstants {
+
+ // DS public constants
+ public static final String DS_ID = "id";
+ public static final String DS_CODE = "code";
+ public static final String DS_TOKEN = "token";
+ public static final String DS_PAGE_SIZE = "pageSize";
+ public static final String DS_PAGE_NO = "pageNo";
+ public static final String DS_SEARCH_VAL = "searchVal";
+ public static final String DS_RESPONSE_DATA = "data";
+ public static final String DS_RESPONSE_NAME = "name";
+ public static final String DS_RESPONSE_TOTAL_LIST = "totalList";
+ public static final String DS_DEFAULT_PAGE_SIZE = "10";
+ public static final String DS_DEFAULT_PAGE_NO = "1";
+ public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai";
+
+ // DS project related constants
+ public static final String DS_PROJECT_URL = "/projects";
+ public static final String DS_PROJECT_NAME = "projectName";
+ public static final String DS_PROJECT_DESC = "description";
+ public static final String DS_DEFAULT_PROJECT_NAME =
"default_inlong_offline_scheduler";
+ public static final String DS_DEFAULT_PROJECT_DESC = "default scheduler
project for inlong offline job";
+
+ // DS task related constants
+ public static final String DS_TASK_CODE_URL =
"/task-definition/gen-task-codes";
+ public static final String DS_TASK_RELATION = "taskRelationJson";
+ public static final String DS_TASK_DEFINITION = "taskDefinitionJson";
+ public static final String DS_TASK_GEN_NUM = "genNum";
+ public static final String DS_DEFAULT_TASK_GEN_NUM = "1";
+ public static final String DS_DEFAULT_TASK_NAME =
"default-inlong-http-callback";
+ public static final String DS_DEFAULT_TASK_DESC = "default http request
using shell script callbacks to inlong";
+
+ // DS process definition related constants
+ public static final String DS_PROCESS_URL = "/process-definition";
+ public static final String DS_PROCESS_QUERY_URL =
"/query-process-definition-list";
+ public static final String DS_PROCESS_NAME = "name";
+ public static final String DS_PROCESS_DESC = "description";
+ public static final String DS_PROCESS_CODE = "processDefinitionCode";
+ public static final String DS_DEFAULT_PROCESS_NAME =
"_inlong_offline_process_definition";
+ public static final String DS_DEFAULT_PROCESS_DESC = "scheduler process
definition for inlong group: ";
+
+ // DS release related constants
+ public static final String DS_RELEASE_URL = "/release";
+ public static final String DS_RELEASE_STATE = "releaseState";
+
+ // DS schedule related constants
+ public static final String DS_SCHEDULE_URL = "/schedules";
+ public static final String DS_SCHEDULE_DEF = "schedule";
+ public static final String DS_DEFAULT_SCHEDULE_TIME_FORMAT = "yyyy-MM-dd
HH:mm:ss";
+
+ // DS online/offline related constants
+ public static final String DS_ONLINE_URL = "/online";
+ public static final String DS_ONLINE_STATE = "ONLINE";
+ public static final String DS_OFFLINE_URL = "/offline";
+ public static final String DS_OFFLINE_STATE = "OFFLINE";
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
new file mode 100644
index 0000000000..7b09481cea
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleEngine;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL;
+
+/**
+ * The default implementation of DolphinScheduler engine based on
DolphinScheduler API. Response for processing
+ * the register/unregister/update requests from {@link DolphinScheduleClient}
+ */
+@Data
+@Service
+public class DolphinScheduleEngine implements ScheduleEngine {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DolphinScheduleEngine.class);
+
+ @Value("${server.host:127.0.0.1}")
+ private String host;
+
+ @Value("${server.port:8083}")
+ private int port;
+
+ @Value("${default.admin.user:admin}")
+ private String username;
+
+ @Value("${default.admin.password:inlong}")
+ private String password;
+
+
@Value("${schedule.engine.dolphinscheduler.url:http://127.0.0.1:12345/dolphinscheduler}")
+ private String dolphinUrl;
+
+ @Value("${schedule.engine.dolphinscheduler.token:default_token_value}")
+ private String token;
+
+ @Resource
+ private DolphinScheduleOperator dolphinScheduleOperator;
+
+ private long projectCode;
+ private String projectBaseUrl;
+ private final Map<Long, String> scheduledProcessMap;
+
+ @PostConstruct
+ public void init() {
+ this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
+ }
+
+ public DolphinScheduleEngine(String host, int port, String username,
String password, String dolphinUrl,
+ String token) {
+ this.host = host;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.dolphinUrl = dolphinUrl;
+ this.token = token;
+ this.scheduledProcessMap = new ConcurrentHashMap<>();
+ }
+
+ public DolphinScheduleEngine() {
+ this.scheduledProcessMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * check if there already exists a project for inlong offline schedule
+ * if no then build a new project for inlong-group-id in DolphinScheduler
+ */
+ @Override
+ public void start() {
+ LOGGER.info("Starting dolphin scheduler engine, Checking project
exists...");
+ long code =
dolphinScheduleOperator.checkAndGetUniqueId(projectBaseUrl, token,
DS_DEFAULT_PROJECT_NAME);
+ if (code != 0) {
+ LOGGER.info("Project exists, project code: {}", code);
+ this.projectCode = code;
+
+ LOGGER.info("Starting synchronize existing process definition");
+ String queryProcessDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
+
scheduledProcessMap.putAll(dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl,
token));
+
+ } else {
+ LOGGER.info("There is no inlong offline project exists, default
project will be created");
+ this.projectCode =
+ dolphinScheduleOperator.creatProject(projectBaseUrl,
token, DS_DEFAULT_PROJECT_NAME,
+ DS_DEFAULT_PROJECT_DESC);
+ }
+ }
+
+ /**
+ * Handle schedule register.
+ * @param scheduleInfo schedule info to register
+ */
+ @Override
+ @VisibleForTesting
+ public boolean handleRegister(ScheduleInfo scheduleInfo) {
+ String processDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL;
+ String scheduleUrl = projectBaseUrl + "/" + projectCode +
DS_SCHEDULE_URL;
+ String processName = scheduleInfo.getInlongGroupId() +
DS_DEFAULT_PROCESS_NAME;
+ String processDesc = DS_DEFAULT_PROCESS_DESC +
scheduleInfo.getInlongGroupId();
+
+ LOGGER.info("Dolphin Scheduler handle register begin for {}, Checking
process definition id uniqueness...",
+ scheduleInfo.getInlongGroupId());
+ try {
+ long processDefCode =
dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);
+
+ boolean online = false;
+ if (processDefCode != 0 ||
scheduledProcessMap.containsKey(processDefCode)) {
+
+ // process definition already exists, delete and rebuild
+ LOGGER.info("Process definition exists, process definition id:
{}, deleting...", processDefCode);
+ if (dolphinScheduleOperator.releaseProcessDef(processDefUrl,
processDefCode, token, DS_OFFLINE_STATE)) {
+ dolphinScheduleOperator.deleteProcessDef(processDefUrl,
token, processDefCode);
+ scheduledProcessMap.remove(processDefCode);
+ }
+ }
+ String taskCodeUrl = projectBaseUrl + "/" + projectCode +
DS_TASK_CODE_URL;
+
+ long taskCode = dolphinScheduleOperator.genTaskCode(taskCodeUrl,
token);
+ LOGGER.info("Generate task code for process definition success,
task code: {}", taskCode);
+
+ long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
+ processDefCode =
+ dolphinScheduleOperator.createProcessDef(processDefUrl,
token, processName, processDesc, taskCode,
+ host, port,
+ username, password, offset,
scheduleInfo.getInlongGroupId());
+ LOGGER.info("Create process definition success, process definition
code: {}", processDefCode);
+
+ if (dolphinScheduleOperator.releaseProcessDef(processDefUrl,
processDefCode, token, DS_ONLINE_STATE)) {
+ LOGGER.info("Release process definition success, release
status: {}", DS_ONLINE_STATE);
+
+ int scheduleId =
dolphinScheduleOperator.createScheduleForProcessDef(scheduleUrl,
processDefCode, token,
+ scheduleInfo);
+ LOGGER.info("Create schedule for process definition success,
schedule info: {}", scheduleInfo);
+
+ online =
dolphinScheduleOperator.onlineScheduleForProcessDef(scheduleUrl, scheduleId,
token);
+ LOGGER.info("Online schedule for process definition, status:
{}", online);
+ }
+
+ scheduledProcessMap.putIfAbsent(processDefCode, processName);
+ return online;
+ } catch (Exception e) {
+ LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
+ throw new DolphinScheduleException(
+ String.format("Failed to handle unregister dolphin
scheduler: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Handle schedule unregister.
+ * @param groupId group to un-register schedule info
+ */
+ @Override
+ @VisibleForTesting
+ public boolean handleUnregister(String groupId) {
+ String processName = groupId + DS_DEFAULT_PROCESS_NAME;
+ String processDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL;
+
+ LOGGER.info("Dolphin Scheduler handle Unregister begin for {},
Checking process definition id uniqueness...",
+ groupId);
+ try {
+ long processDefCode =
dolphinScheduleOperator.checkAndGetUniqueId(processDefUrl, token, processName);
+ if (processDefCode != 0 ||
scheduledProcessMap.containsKey(processDefCode)) {
+
+ LOGGER.info("Deleting process definition, process definition
id: {}", processDefCode);
+ if (dolphinScheduleOperator.releaseProcessDef(processDefUrl,
processDefCode, token, DS_OFFLINE_STATE)) {
+
+ dolphinScheduleOperator.deleteProcessDef(processDefUrl,
token, processDefCode);
+ scheduledProcessMap.remove(processDefCode);
+ LOGGER.info("Process definition deleted");
+ }
+ }
+ LOGGER.info("Un-registered dolphin schedule info for {}", groupId);
+ return !scheduledProcessMap.containsKey(processDefCode);
+ } catch (Exception e) {
+ LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
+ throw new DolphinScheduleException(
+ String.format("Failed to handle unregister dolphin
scheduler: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Handle schedule update.
+ * @param scheduleInfo schedule info to update
+ */
+ @Override
+ @VisibleForTesting
+ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
+ LOGGER.info("Update dolphin schedule info for {}",
scheduleInfo.getInlongGroupId());
+ try {
+ return handleUnregister(scheduleInfo.getInlongGroupId()) &&
handleRegister(scheduleInfo);
+ } catch (Exception e) {
+ LOGGER.error("Failed to handle update dolphin scheduler: ", e);
+ throw new DolphinScheduleException(
+ String.format("Failed to handle update dolphin scheduler:
%s", e.getMessage()));
+ }
+ }
+
+ /**
+ * stop and delete all process definition in DolphinScheduler
+ * remove all process stored in scheduledProcessMap
+ * delete project for inlong-group-id in DolphinScheduler
+ */
+ @Override
+ public void stop() {
+ LOGGER.info("Stopping dolphin scheduler engine...");
+ String processDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL;
+ try {
+
+ String queryProcessDefUrl = projectBaseUrl + "/" + projectCode +
DS_PROCESS_URL + DS_PROCESS_QUERY_URL;
+ Map<Long, String> allProcessDef =
dolphinScheduleOperator.queryAllProcessDef(queryProcessDefUrl, token);
+
+ for (Long processDefCode : allProcessDef.keySet()) {
+
+ LOGGER.info("delete process definition id: {}",
processDefCode);
+ dolphinScheduleOperator.releaseProcessDef(processDefUrl,
processDefCode, token, DS_OFFLINE_STATE);
+ dolphinScheduleOperator.deleteProcessDef(processDefUrl, token,
processDefCode);
+ scheduledProcessMap.remove(processDefCode);
+ }
+
+ dolphinScheduleOperator.deleteProject(projectBaseUrl, token,
projectCode);
+ LOGGER.info("Dolphin scheduler engine stopped");
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to stop dolphin scheduler: ", e);
+ throw new DolphinScheduleException(String.format("Failed to stop
dolphin scheduler: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
new file mode 100644
index 0000000000..e317478c64
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR;
+
+/**
+ * DolphinScheduler operator, This class includes methods for creating,
updating, and deleting projects,
+ * tasks, and process definitions in DolphinScheduler.
+ */
+@Service
+public class DolphinScheduleOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DolphinScheduleOperator.class);
+
+ /**
+ * Checks the uniqueness of a DolphinScheduler project ID based on the
given search value.
+ */
+ public long checkAndGetUniqueId(String url, String token, String
searchVal) {
+ try {
+ return DolphinScheduleUtils.checkAndGetUniqueId(url, token,
searchVal);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in check id uniqueness: ", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in check id uniqueness:
%s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Creates a new project in DolphinScheduler.
+ */
+ public long creatProject(String url, String token, String projectName,
String description) {
+ try {
+ return DolphinScheduleUtils.creatProject(url, token, projectName,
description);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected error while creating new project: ", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected error while creating new
project: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Query all process definition in DolphinScheduler project.
+ */
+ public Map<Long, String> queryAllProcessDef(String url, String token) {
+ try {
+ return DolphinScheduleUtils.queryAllProcessDef(url, token);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected error while querying process definition:
", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected error while querying process
definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Generates a new task code in DolphinScheduler.
+ */
+ public long genTaskCode(String url, String token) {
+ try {
+ return DolphinScheduleUtils.genTaskCode(url, token);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in generating task code: ", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in generating task code:
%s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Creates a process definition in DolphinScheduler.
+ */
+ public long createProcessDef(String url, String token, String name, String
desc, long taskCode, String host,
+ int port, String username, String password, long offset, String
groupId) {
+ try {
+ return DolphinScheduleUtils.createProcessDef(url, token, name,
desc, taskCode, host,
+ port, username, password, offset, groupId);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in creating process definition: ",
e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in creating process
definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Releases a process definition in DolphinScheduler.
+ */
+ public boolean releaseProcessDef(String processDefUrl, long
processDefCode, String token, String status) {
+ try {
+ return DolphinScheduleUtils.releaseProcessDef(processDefUrl,
processDefCode, token, status);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in release process definition: ",
e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in release process
definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Create a schedule for process definition in DolphinScheduler.
+ */
+ public int createScheduleForProcessDef(String url, long processDefCode,
String token, ScheduleInfo scheduleInfo) {
+ try {
+ return DolphinScheduleUtils.createScheduleForProcessDef(url,
processDefCode, token,
+ scheduleInfo);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in creating schedule for process
definition: ", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in creating schedule for
process definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Online the schedule for process definition in DolphinScheduler.
+ */
+ public boolean onlineScheduleForProcessDef(String scheduleUrl, int
scheduleId, String token) {
+ try {
+ return
DolphinScheduleUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId,
token);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in online process definition: ", e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in online process
definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Delete the process definition in DolphinScheduler.
+ */
+ public void deleteProcessDef(String processDefUrl, String token, long
processDefCode) {
+ try {
+ DolphinScheduleUtils.delete(processDefUrl, token, processDefCode);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in deleting process definition: ",
e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in deleting process
definition: %s", e.getMessage()));
+ }
+ }
+
+ /**
+ * Delete the project in DolphinScheduler.
+ */
+ public void deleteProject(String projectBaseUrl, String token, long
projectCode) {
+ try {
+ DolphinScheduleUtils.delete(projectBaseUrl, token, projectCode);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected wrong in deleting project definition: ",
e);
+ throw new DolphinScheduleException(UNEXPECTED_ERROR,
+ String.format("Unexpected wrong in deleting project
definition: %s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
new file mode 100644
index 0000000000..87cb1c5127
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.common.bounded.BoundaryType;
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import
org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskDefinition;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskParams;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskRelation;
+import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DScheduleInfo;
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.core.util.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_SIZE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_CODE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_DESC;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_DESC;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_STATE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TOKEN;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.DELETION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.GEN_TASK_CODE_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.HTTP_REQUEST_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.INVALID_HTTP_METHOD;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_CREATION_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_ONLINE_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNIQUE_CHECK_FAILED;
+import static
org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNSUPPORTED_SCHEDULE_TYPE;
+
+/**
+ * DolphinScheduler utils
+ * A utility class for interacting with DolphinScheduler API.
+ */
+public class DolphinScheduleUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DolphinScheduleEngine.class);
+
+ private static final String POST = "POST";
+ private static final String GET = "GET";
+ private static final String DELETE = "DELETE";
+ private static final long MILLIS_IN_SECOND = 1000L;
+ private static final long MILLIS_IN_MINUTE = 60 * MILLIS_IN_SECOND;
+ private static final long MILLIS_IN_HOUR = 60 * MILLIS_IN_MINUTE;
+ private static final long MILLIS_IN_DAY = 24 * MILLIS_IN_HOUR;
+ private static final long MILLIS_IN_WEEK = 7 * MILLIS_IN_DAY;
+ private static final long MILLIS_IN_MONTH = 30 * MILLIS_IN_DAY;
+ private static final long MILLIS_IN_YEAR = 365 * MILLIS_IN_DAY;
+ private static final String CONTENT_TYPE = "Content-Type:
application/json; charset=utf-8";
+ private static final String SHELL_REQUEST_API =
"/inlong/manager/api/group/submitOfflineJob";
+ private static final OkHttpClient CLIENT = new OkHttpClient();
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private DolphinScheduleUtils() {
+ }
+
+ /**
+ * Checks the uniqueness of a project ID based on the given search value.
+ *
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the request
header.
+ * @param searchVal The name of the project to search for.
+ * @return The unique project ID if found, or 0 if not found or an error
occurs.
+ */
+ public static long checkAndGetUniqueId(String url, String token, String
searchVal) {
+ try {
+ Map<String, String> header = buildHeader(token);
+ Map<String, String> queryParams = buildPageParam(searchVal);
+
+ JsonObject response = executeHttpRequest(url, GET, queryParams,
header);
+
+ JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject();
+ JsonArray totalList = data.getAsJsonArray(DS_RESPONSE_TOTAL_LIST);
+
+ // check uniqueness
+ if (totalList != null && totalList.size() == 1) {
+ JsonObject project = totalList.get(0).getAsJsonObject();
+ String name = project.get(DS_RESPONSE_NAME).getAsString();
+ if (name.equals(searchVal)) {
+ return project.get(DS_CODE).getAsLong();
+ }
+ }
+ return 0;
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during checkAndGetUniqueId", e);
+ throw new DolphinScheduleException(JSON_PARSE_ERROR,
+ String.format("Error parsing json during unique ID check
for: %s at URL: %s", searchVal, url), e);
+
+ } catch (DolphinScheduleException e) {
+ LOGGER.error("DolphinScheduleException during unique ID check:
{}", e.getDetailedMessage(), e);
+ throw new DolphinScheduleException(UNIQUE_CHECK_FAILED,
+ String.format("Error checking unique ID for %s at URL:
%s", searchVal, url));
+ }
+ }
+
+ /**
+ * Creates a new project in DolphinScheduler.
+ *
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the request
header.
+ * @param projectName The name of the new project.
+ * @param description The description of the new project.
+ * @return The project code (ID) if creation is successful, or 0 if an
error occurs.
+ */
+ public static long creatProject(String url, String token, String
projectName,
+ String description) {
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(DS_PROJECT_NAME, projectName);
+ queryParams.put(DS_PROJECT_DESC, description);
+
+ JsonObject response = executeHttpRequest(url, POST, queryParams,
header);
+
+ JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject();
+ LOGGER.info("create project success, project data: {}", data);
+
+ return data != null ? data.get(DS_CODE).getAsLong() : 0;
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during creating project", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error creating project with name: %s and
description: %s at URL: %s",
+ projectName, description, url));
+
+ } catch (DolphinScheduleException e) {
+ LOGGER.error("Creating project failed: {}", e.getMessage());
+ throw new DolphinScheduleException(
+ PROJECT_CREATION_FAILED,
+ String.format("Error creating project with name: %s and
description: %s at URL: %s",
+ projectName, description, url));
+ }
+ }
+
+ /**
+ * Query all process definition in project
+ *
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the request header.
+ * @return Map of all the process definition
+ */
+ public static Map<Long, String> queryAllProcessDef(String url, String
token) {
+ Map<String, String> header = buildHeader(token);
+ try {
+ JsonObject response = executeHttpRequest(url, GET, new
HashMap<>(), header);
+
+ Map<Long, String> processDef =
+
StreamSupport.stream(response.get(DS_RESPONSE_DATA).getAsJsonArray().spliterator(),
false)
+ .map(JsonElement::getAsJsonObject)
+ .collect(Collectors.toMap(
+ jsonObject ->
jsonObject.get(DS_CODE).getAsLong(),
+ jsonObject ->
jsonObject.get(DS_PROCESS_NAME).getAsString()));
+
+ LOGGER.info("Query all process definition success, processes info:
{}", processDef);
+ return processDef;
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during query all process
definition", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error querying all process definitions at
URL: %s", url));
+
+ } catch (DolphinScheduleException e) {
+ LOGGER.info("Query all process definition failed: {}",
e.getMessage());
+ throw new DolphinScheduleException(
+ PROCESS_DEFINITION_QUERY_FAILED,
+ String.format("Error querying all process definitions at
URL: %s", url));
+ }
+
+ }
+
+ /**
+ * Generates a new task code in DolphinScheduler.
+ *
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the request header.
+ * @return The task code (ID) if generation is successful, or 0 if an
error occurs.
+ */
+ public static long genTaskCode(String url, String token) {
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(DS_TASK_GEN_NUM, DS_DEFAULT_TASK_GEN_NUM);
+
+ JsonObject response = executeHttpRequest(url, GET, queryParams,
header);
+
+ JsonArray data = response.get(DS_RESPONSE_DATA).getAsJsonArray();
+
+ LOGGER.info("Generate task code success, task code data: {}",
data);
+ return data != null && data.size() == 1 ? data.get(0).getAsLong()
: 0;
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during generate task code", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error generate task code at URL: %s", url));
+
+ } catch (DolphinScheduleException e) {
+ LOGGER.info("generate task code failed: {}", e.getMessage());
+ throw new DolphinScheduleException(
+ GEN_TASK_CODE_FAILED,
+ String.format("Error generate task code at URL: %s", url));
+ }
+ }
+
+ /**
+ * Creates a process definition in DolphinScheduler.
+ *
+ * @param url The base URL of the DolphinScheduler API.
+ * @param token The authentication token to be used in the request
header.
+ * @param name The name of the process definition.
+ * @param desc The description of the process definition.
+ * @param taskCode The task code to be associated with this process
definition.
+ * @param host The host where the process will run.
+ * @param port The port where the process will run.
+ * @param username The username for authentication.
+ * @param password The password for authentication.
+ * @param offset The offset for the scheduling.
+ * @param groupId The group ID of the process.
+ * @return The process definition code (ID) if creation is successful, or
0 if an error occurs.
+ */
+ public static long createProcessDef(String url, String token, String name,
String desc,
+ long taskCode, String host,
+ int port, String username, String password, long offset, String
groupId) throws Exception {
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ DSTaskRelation taskRelation = new DSTaskRelation();
+ taskRelation.setPostTaskCode(taskCode);
+ String taskRelationJson =
MAPPER.writeValueAsString(Collections.singletonList(taskRelation));
+
+ DSTaskParams taskParams = new DSTaskParams();
+ taskParams.setRawScript(buildScript(host, port, username,
password, offset, groupId));
+
+ DSTaskDefinition taskDefinition = new DSTaskDefinition();
+ taskDefinition.setCode(taskCode);
+ taskDefinition.setName(DS_DEFAULT_TASK_NAME);
+ taskDefinition.setDescription(DS_DEFAULT_TASK_DESC);
+ taskDefinition.setTaskParams(taskParams);
+ String taskDefinitionJson =
MAPPER.writeValueAsString(Collections.singletonList(taskDefinition));
+
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(DS_TASK_RELATION, taskRelationJson);
+ queryParams.put(DS_TASK_DEFINITION, taskDefinitionJson);
+ queryParams.put(DS_PROCESS_NAME, name);
+ queryParams.put(DS_PROCESS_DESC, desc);
+
+ JsonObject data = executeHttpRequest(url, POST, queryParams,
header);
+
+ LOGGER.info("create process definition success, process definition
data: {}", data);
+ return data != null ?
data.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_CODE).getAsLong() : 0;
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during creating process
definition", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error creating process definition with
name: %s and description: %s at URL: %s",
+ name, desc, url));
+
+ } catch (DolphinScheduleException e) {
+ throw new DolphinScheduleException(
+ PROCESS_DEFINITION_CREATION_FAILED,
+ String.format("Error creating process definition with
name: %s and description: %s at URL: %s",
+ name, desc, url));
+ }
+ }
+
+ /**
+ * Releases a process definition in DolphinScheduler.
+ *
+ * @param processDefUrl The URL to release the process definition.
+ * @param processDefCode The ID of the process definition.
+ * @param token The authentication token to be used in the
request header.
+ * @param status The status to set for the process definition
(e.g., "online" or "offline").
+ * @return true if the process definition was successfully released, false
otherwise.
+ */
+ public static boolean releaseProcessDef(String processDefUrl, long
processDefCode,
+ String token, String status) {
+ try {
+ String url = processDefUrl + "/" + processDefCode + DS_RELEASE_URL;
+ Map<String, String> header = buildHeader(token);
+
+ Map<String, String> queryParam = new HashMap<>();
+ queryParam.put(DS_RELEASE_STATE, status);
+
+ JsonObject response = executeHttpRequest(url, POST, queryParam,
header);
+ LOGGER.info("release process definition success, response data:
{}", response);
+
+ return response.get(DS_RESPONSE_DATA).getAsBoolean();
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during releasing process
definition", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error releasing process definition with
code: %d and status: %s at URL: %s",
+ processDefCode, status, processDefUrl));
+
+ } catch (DolphinScheduleException e) {
+ throw new DolphinScheduleException(
+ PROCESS_DEFINITION_RELEASE_FAILED,
+ String.format("Error releasing process definition with
code: %d and status: %s at URL: %s",
+ processDefCode, status, processDefUrl));
+ }
+ }
+
+ /**
+ * Create a schedule for process definition in DolphinScheduler.
+ *
+ * @param url The URL to create a schedule for the process definition.
+ * @param processDefCode The ID of the process definition.
+ * @param token The authentication token to be used in the
request header.
+ * @param scheduleInfo The schedule info
+ * @return The schedule id
+ */
+ public static int createScheduleForProcessDef(String url, long
processDefCode,
+ String token, ScheduleInfo scheduleInfo) throws Exception {
+
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(DS_DEFAULT_SCHEDULE_TIME_FORMAT);
+ String startTime = scheduleInfo.getStartTime().toLocalDateTime()
+
.atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter);
+ String endTime = scheduleInfo.getEndTime().toLocalDateTime()
+
.atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter);
+
+ String crontab;
+ switch (scheduleInfo.getScheduleType()) {
+ case 0:
+ crontab =
generateCrontabExpression(scheduleInfo.getScheduleUnit(),
+ scheduleInfo.getScheduleInterval());
+ break;
+
+ case 1:
+ crontab = scheduleInfo.getCrontabExpression();
+ break;
+
+ default:
+ LOGGER.error("Unsupported schedule type: {}",
scheduleInfo.getScheduleType());
+ throw new DolphinScheduleException("Unsupported schedule
type: " + scheduleInfo.getScheduleType());
+ }
+
+ DScheduleInfo dScheduleInfo = new DScheduleInfo();
+ dScheduleInfo.setStartTime(startTime);
+ dScheduleInfo.setEndTime(endTime);
+ dScheduleInfo.setCrontab(crontab);
+ dScheduleInfo.setTimezoneId(DS_DEFAULT_TIMEZONE_ID);
+ String scheduleDef = MAPPER.writeValueAsString(dScheduleInfo);
+
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(DS_PROCESS_CODE, String.valueOf(processDefCode));
+ queryParams.put(DS_SCHEDULE_DEF, scheduleDef);
+
+ JsonObject response = executeHttpRequest(url, POST, queryParams,
header);
+ LOGGER.info("create schedule for process definition success,
response data: {}", response);
+
+ return
response.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_ID).getAsInt();
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during releasing process
definition", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error creating schedule for process
definition code: %d at URL: %s",
+ processDefCode, url));
+
+ } catch (DolphinScheduleException e) {
+ throw new DolphinScheduleException(
+ SCHEDULE_CREATION_FAILED,
+ String.format("Error creating schedule for process
definition code: %d at URL: %s",
+ processDefCode, url));
+ }
+ }
+
+ /**
+ * Online the schedule for process definition in DolphinScheduler.
+ *
+ * @param scheduleUrl The URL to online the schedule for process
definition.
+ * @param scheduleId The ID of the schedule of process definition.
+ * @param token The authentication token to be used in the
request header.
+ * @return whether online is succeeded
+ */
+ public static boolean onlineScheduleForProcessDef(String scheduleUrl, int
scheduleId,
+ String token) {
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ String url = scheduleUrl + "/" + scheduleId + DS_ONLINE_URL;
+ JsonObject response = executeHttpRequest(url, POST, new
HashMap<>(), header);
+ LOGGER.info("online schedule for process definition success,
response data: {}", response);
+
+ if (response != null &&
!response.get(DS_RESPONSE_DATA).isJsonNull()) {
+ return response.get(DS_RESPONSE_DATA).getAsBoolean();
+ }
+ return false;
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during online schedule", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error online schedule with ID: %d online at
URL: %s", scheduleId, scheduleUrl));
+
+ } catch (DolphinScheduleException e) {
+ throw new DolphinScheduleException(
+ SCHEDULE_ONLINE_FAILED,
+ String.format("Error online schedule with ID: %d online at
URL: %s", scheduleId, scheduleUrl));
+ }
+ }
+
+ /**
+ * Delete the process definition in DolphinScheduler.
+ *
+ * @param url The URL to delete the project or process definition.
+ * @param token The authentication token to be used in the
request header.
+ * @param code The project code or process definition code
+ */
+ public static void delete(String url, String token, long code) {
+ try {
+ Map<String, String> header = buildHeader(token);
+
+ String requestUrl = url + "/" + code;
+
+ JsonObject response = executeHttpRequest(requestUrl, DELETE, new
HashMap<>(), header);
+ LOGGER.info("delete process or project success, response data:
{}", response);
+
+ } catch (JsonParseException e) {
+ LOGGER.error("JsonParseException during deleting process or
project", e);
+ throw new DolphinScheduleException(
+ JSON_PARSE_ERROR,
+ String.format("Error deleting process or project with
code: %d at URL: %s", code, url), e);
+
+ } catch (DolphinScheduleException e) {
+ throw new DolphinScheduleException(
+ DELETION_FAILED,
+ String.format("Error deleting process or project with
code: %d at URL: %s", code, url), e);
+ }
+ }
+
+ /**
+ * Builds the header map for HTTP requests, including the authentication
token.
+ *
+ * @param token The authentication token for the request.
+ * @return A map representing the headers of the HTTP request.
+ */
+ private static Map<String, String> buildHeader(String token) {
+ Map<String, String> headers = new HashMap<>();
+ if (StringUtils.isNotEmpty(token)) {
+ headers.put(DS_TOKEN, token);
+ }
+ return headers;
+ }
+
+ /**
+ * Builds a query parameter map used for API calls that need to paginate
or filter results.
+ * This method can be used for searching projects or tasks.
+ *
+ * @param searchVal The value to search for.
+ * @return A map containing the necessary query parameters.
+ */
+ private static Map<String, String> buildPageParam(String searchVal) {
+ Map<String, String> queryParams = new HashMap<>();
+ queryParams.put(DS_SEARCH_VAL, searchVal);
+ queryParams.put(DS_PAGE_SIZE, DS_DEFAULT_PAGE_SIZE);
+ queryParams.put(DS_PAGE_NO, DS_DEFAULT_PAGE_NO);
+ return queryParams;
+ }
+
+ /**
+ * Calculate the offset according to schedule info
+ *
+ * @param scheduleInfo The schedule info
+ * @return timestamp between two schedule task
+ */
+ public static long calculateOffset(ScheduleInfo scheduleInfo) {
+ if (scheduleInfo == null) {
+ LOGGER.error("ScheduleInfo cannot be null");
+ throw new DolphinScheduleException("ScheduleInfo cannot be null");
+ }
+
+ long offset = 0;
+
+ // Determine offset based on schedule type
+ if (scheduleInfo.getScheduleType() == null) {
+ LOGGER.error("Schedule type cannot be null");
+ throw new DolphinScheduleException("Schedule type cannot be null");
+ }
+
+ switch (scheduleInfo.getScheduleType()) {
+ case 0: // Normal scheduling
+ offset = calculateNormalOffset(scheduleInfo);
+ break;
+ case 1: // Crontab scheduling
+ offset = calculateCronOffset(scheduleInfo);
+ break;
+ default:
+ LOGGER.error("Invalid schedule type");
+ throw new DolphinScheduleException(
+ UNSUPPORTED_SCHEDULE_TYPE, "Invalid schedule type");
+ }
+
+ // Add delay time if specified
+ if (scheduleInfo.getDelayTime() != null) {
+ offset += scheduleInfo.getDelayTime() * MILLIS_IN_SECOND;
+ }
+
+ return offset;
+ }
+
+ private static long calculateNormalOffset(ScheduleInfo scheduleInfo) {
+ if (scheduleInfo.getScheduleInterval() == null ||
scheduleInfo.getScheduleUnit() == null) {
+ LOGGER.error("Schedule interval and unit cannot be null for normal
scheduling");
+ throw new IllegalArgumentException("Schedule interval and unit
cannot be null for normal scheduling");
+ }
+ switch
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit())))
{
+ case YEAR:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_YEAR;
+ case MONTH:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_MONTH;
+ case WEEK:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_WEEK;
+ case DAY:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_DAY;
+ case HOUR:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_HOUR;
+ case MINUTE:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_MINUTE;
+ case SECOND:
+ return scheduleInfo.getScheduleInterval() * MILLIS_IN_SECOND;
+ case ONE_ROUND:
+ return scheduleInfo.getScheduleInterval();
+ default:
+ LOGGER.error("Invalid schedule unit");
+ throw new DolphinScheduleException("Invalid schedule unit");
+ }
+ }
+
+ private static long calculateCronOffset(ScheduleInfo scheduleInfo) {
+ if (scheduleInfo.getCrontabExpression() == null) {
+ LOGGER.error("Crontab expression cannot be null for schedule type
crontab");
+ throw new DolphinScheduleException("Crontab expression cannot be
null for schedule type crontab");
+ }
+
+ try {
+ CronExpression cronExpression = new
CronExpression(scheduleInfo.getCrontabExpression());
+ Date firstExecution = cronExpression.getNextValidTimeAfter(new
Date());
+ Date secondExecution =
cronExpression.getNextValidTimeAfter(firstExecution);
+
+ if (secondExecution != null) {
+ return secondExecution.getTime() - firstExecution.getTime();
+ } else {
+ LOGGER.error("Unable to calculate the next execution times for
the cron expression");
+ throw new DolphinScheduleException(
+ "Unable to calculate the next execution times for the
cron expression");
+ }
+ } catch (Exception e) {
+ LOGGER.error("Invalid cron expression: ", e);
+ throw new DolphinScheduleException(String.format("Invalid cron
expression: %s", e.getMessage()));
+ }
+ }
+
+ private static String generateCrontabExpression(String scheduleUnit,
Integer scheduleInterval) {
+ if (scheduleUnit.isEmpty()) {
+ LOGGER.error("Schedule unit and interval must not be null for
generating crontab expression");
+ throw new DolphinScheduleException(
+ "Schedule unit and interval must not be null for
generating crontab expression");
+ }
+ String crontabExpression;
+
+ switch
(Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleUnit))) {
+ case SECOND:
+ crontabExpression = String.format("0/%d * * * * ? *",
scheduleInterval);
+ break;
+ case MINUTE:
+ crontabExpression = String.format("* 0/%d * * * ? *",
scheduleInterval);
+ break;
+ case HOUR:
+ crontabExpression = String.format("* * 0/%d * * ? *",
scheduleInterval);
+ break;
+ case DAY:
+ crontabExpression = String.format("* * * 1/%d * ? *",
scheduleInterval);
+ break;
+ case WEEK:
+ crontabExpression = String.format("* * * 1/%d * ? *",
scheduleInterval * 7);
+ break;
+ case MONTH:
+ crontabExpression = String.format("* * * * 0/%d ? *",
scheduleInterval);
+ break;
+ case YEAR:
+ crontabExpression = String.format("* * * * * ? 0/%d",
scheduleInterval);
+ break;
+ default:
+ LOGGER.error("Unsupported schedule unit for generating
crontab: {}", scheduleUnit);
+ throw new DolphinScheduleException("Unsupported schedule unit
for generating crontab: " + scheduleUnit);
+ }
+
+ return crontabExpression;
+ }
+
+ /**
+ * Executes an HTTP request using OkHttp. Supports various HTTP methods
(GET, POST, PUT, DELETE).
+ *
+ * @param url The URL of the request.
+ * @param method The HTTP method (GET, POST, PUT, DELETE).
+ * @param queryParams The query parameters for the request (optional).
+ * @param headers The headers for the request.
+ * @return A JsonObject containing the response from the server.
+ * @throws DolphinScheduleException If an error occurs during the request.
+ */
+ private static JsonObject executeHttpRequest(String url, String method,
Map<String, String> queryParams,
+ Map<String, String> headers) {
+ HttpUrl.Builder urlBuilder =
Objects.requireNonNull(HttpUrl.parse(url)).newBuilder();
+
+ for (Map.Entry<String, String> entry : queryParams.entrySet()) {
+ urlBuilder.addQueryParameter(entry.getKey(), entry.getValue());
+ }
+ HttpUrl httpUrl = urlBuilder.build();
+
+ Request.Builder requestBuilder = new Request.Builder()
+ .url(httpUrl);
+
+ for (Map.Entry<String, String> entry : headers.entrySet()) {
+ requestBuilder.addHeader(entry.getKey(), entry.getValue());
+ }
+ RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE),
"");
+
+ switch (method.toUpperCase()) {
+ case POST:
+ requestBuilder.post(body);
+ break;
+ case GET:
+ requestBuilder.get();
+ break;
+ case DELETE:
+ requestBuilder.delete(body);
+ break;
+ default:
+ throw new DolphinScheduleException(INVALID_HTTP_METHOD,
+ String.format("Unsupported request method: %s",
method));
+ }
+
+ Request request = requestBuilder.build();
+
+ // get response
+ try (Response response = CLIENT.newCall(request).execute()) {
+ String responseBody = response.body() != null ?
response.body().string() : null;
+ LOGGER.debug("HTTP request to {} completed with status code {}",
httpUrl, response.code());
+
+ if (response.isSuccessful() && responseBody != null) {
+ return JsonParser.parseString(responseBody).getAsJsonObject();
+ } else {
+ LOGGER.error("HTTP request to {} failed. HTTP Status: {},
Response Body: {}", httpUrl, response.code(),
+ responseBody != null ? responseBody : "No response
body");
+
+ throw new DolphinScheduleException(
+ HTTP_REQUEST_FAILED,
+ String.format("HTTP request to %s failed. Status: %d,
Response: %s",
+ httpUrl, response.code(), responseBody != null
? responseBody : "No response body"));
+ }
+ } catch (IOException e) {
+ throw new DolphinScheduleException(
+ NETWORK_ERROR,
+ String.format("Network error during HTTP request to %s.
Reason: %s", httpUrl, e.getMessage()));
+ } catch (Exception e) {
+ throw new DolphinScheduleException(
+ UNEXPECTED_ERROR,
+ String.format("Unexpected error during HTTP request to %s.
Reason: %s", httpUrl, e.getMessage()));
+ }
+ }
+
+ /**
+ * Shell node in DolphinScheduler need to write in a script
+ * When process definition schedule run, the shell node run,
+ * Call back in inlong, sending a request with parameters required
+ */
+ private static String buildScript(String host, int port, String username,
String password, long offset,
+ String groupId) {
+ LOGGER.info("build script for host: {}, port: {}, username: {},
password: {}, offset: {}, groupId: {}", host,
+ port, username, password, offset, groupId);
+ return "#!/bin/bash\n\n" +
+
+ // Get current timestamp
+ "# Get current timestamp\n" +
+ "lowerBoundary=$(date +%s)\n" +
+ "echo \"get lowerBoundary: ${lowerBoundary}\"\n" +
+ "upperBoundary=$(($lowerBoundary + " + offset + "))\n" +
+ "echo \"get upperBoundary: ${upperBoundary}\"\n\n" +
+
+ // Set URL
+ "# Set URL and HTTP method\n" +
+ "url=\"http://" + host + ":" + port + SHELL_REQUEST_API +
+ "?username=" + username + "&password=" + password + "\"\n" +
+ "echo \"get url: ${url}\"\n" +
+
+ // Set HTTP method
+ "httpMethod=\"POST\"\n\n" +
+
+ // Set request body
+ "# Build request body\n" +
+ "jsonBody=$(cat <<EOF\n" +
+ "{\n" +
+ " \"boundaryType\": \"" + BoundaryType.TIME.getType() +
"\",\n" +
+ " \"groupId\": \"" + groupId + "\",\n" +
+ " \"lowerBoundary\": \"${lowerBoundary}\",\n" +
+ " \"upperBoundary\": \"${upperBoundary}\"\n" +
+ "}\n" +
+ "EOF\n)\n\n" +
+ "echo \"${jsonBody}\"\n\n" +
+
+ // Send request
+ "# Send request\n" +
+ "response=$(curl -s -X \"$httpMethod\" \"$url\" \\\n" +
+ " -H \"" + CONTENT_TYPE + "\" \\\n" +
+ " -d \"$jsonBody\")\n\n" +
+
+ // Log response
+ "# Log response\n" +
+ "echo \"Request Sending success, Response: $response\"";
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
new file mode 100644
index 0000000000..348697b672
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/DolphinScheduleException.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.exception;
+
+import lombok.Getter;
+
+/**
+ * Custom exception for DolphinScheduler operations.
+ * Provides error codes, detailed messages, and optional nested exceptions.
+ */
+@Getter
+public class DolphinScheduleException extends RuntimeException {
+
+ // Common error codes
+ public static final String UNIQUE_CHECK_FAILED = "UNIQUE_CHECK_FAILED";
+ public static final String JSON_PARSE_ERROR = "JSON_PARSE_ERROR";
+ public static final String DELETION_FAILED = "DELETION_FAILED";
+ public static final String INVALID_HTTP_METHOD = "INVALID_HTTP_METHOD";
+ public static final String HTTP_REQUEST_FAILED = "HTTP_REQUEST_FAILED";
+ public static final String NETWORK_ERROR = "NETWORK_ERROR";
+ public static final String UNEXPECTED_ERROR = "UNEXPECTED_ERROR";
+
+ // Project-related error codes
+ public static final String PROJECT_CREATION_FAILED =
"PROJECT_CREATION_FAILED";
+
+ // TaskCode-related error codes
+ public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED";
+
+ // Process-related error codes
+ public static final String PROCESS_DEFINITION_QUERY_FAILED =
"PROCESS_DEFINITION_QUERY_FAILED";
+ public static final String PROCESS_DEFINITION_CREATION_FAILED =
"PROCESS_DEFINITION_CREATION_FAILED";
+ public static final String PROCESS_DEFINITION_RELEASE_FAILED =
"PROCESS_DEFINITION_RELEASE_FAILED";
+ public static final String SCHEDULE_CREATION_FAILED =
"SCHEDULE_CREATION_FAILED";
+ public static final String SCHEDULE_ONLINE_FAILED =
"SCHEDULE_ONLINE_FAILED";
+ public static final String UNSUPPORTED_SCHEDULE_TYPE =
"UNSUPPORTED_SCHEDULE_TYPE";
+
+ private final String errorCode;
+ private final String detailedMessage;
+
+ /**
+ * Constructor with message only.
+ *
+ * @param message The error message.
+ */
+ public DolphinScheduleException(String message) {
+ this(null, message, null);
+ }
+
+ /**
+ * Constructor with message and cause.
+ *
+ * @param message The error message.
+ * @param cause The underlying cause of the exception.
+ */
+ public DolphinScheduleException(String message, Throwable cause) {
+ this(null, message, cause);
+ }
+
+ /**
+ * Constructor with error code, message, and cause.
+ *
+ * @param errorCode A specific error code for the exception.
+ * @param detailedMessage A detailed error message providing additional
context.
+ * @param cause The underlying cause of the exception (optional).
+ */
+ public DolphinScheduleException(String errorCode, String detailedMessage,
Throwable cause) {
+ super(detailedMessage, cause);
+ this.errorCode = errorCode;
+ this.detailedMessage = detailedMessage;
+ }
+
+ /**
+ * Constructor with error code and message.
+ *
+ * @param errorCode A specific error code for the exception.
+ * @param detailedMessage A detailed error message providing additional
context.
+ */
+ public DolphinScheduleException(String errorCode, String detailedMessage) {
+ this(errorCode, detailedMessage, null);
+ }
+
+ @Override
+ public String toString() {
+ return "DolphinScheduleException{" +
+ "errorCode='" + errorCode + '\'' +
+ ", detailedMessage='" + detailedMessage + '\'' +
+ ", cause=" + getCause() +
+ '}';
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
new file mode 100644
index 0000000000..c57bd6e783
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
+import org.apache.inlong.manager.schedule.exception.DolphinScheduleException;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.http.HttpHeaders.CONTENT_TYPE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SC_TYPE;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SESSION_ID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_PASSWORD;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_SERVICE_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERNAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME_FORMAT;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_NAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_TAG;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_LOGIN_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_PASSWORD;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_RESPONSE_TOKEN;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_GEN_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_URL;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERID;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERNAME;
+import static
org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INTER_CONTAINER_DS_ALIAS;
+
+public abstract class DolphinScheduleContainerTestEnv extends BaseScheduleTest
{
+
+ private static final Logger DS_LOG =
LoggerFactory.getLogger(DolphinScheduleEngineTest.class);
+
+ private static final Network NETWORK = Network.newNetwork();
+
+ protected static final GenericContainer<?> dolphinSchedulerContainer =
+ new GenericContainer<>(DS_IMAGE_NAME + ":" + DS_IMAGE_TAG)
+ .withExposedPorts(12345, 25333)
+ .withEnv("TZ", DS_DEFAULT_TIMEZONE_ID)
+ .withNetwork(NETWORK)
+ .withAccessToHost(true)
+ .withNetworkAliases(INTER_CONTAINER_DS_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(DS_LOG));
+
+ /**
+ * This method just for DS testing, login by default admin username and
password
+ * generate a 1-day expiring token for test, the token will disappear with
the DS container shutting down
+ *
+ * @return the DS token
+ */
+ protected static String accessToken() {
+ Map<String, String> loginParams = new HashMap<>();
+ loginParams.put(DS_USERNAME, DS_DEFAULT_USERNAME);
+ loginParams.put(DS_PASSWORD, DS_DEFAULT_PASSWORD);
+ try {
+ JsonObject loginResponse =
+ executeHttpRequest(DS_DEFAULT_SERVICE_URL + DS_LOGIN_URL,
loginParams, new HashMap<>());
+ if (loginResponse.get("success").getAsBoolean()) {
+ String tokenGenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL +
DS_TOKEN_GEN_URL;
+ Map<String, String> tokenParams = new HashMap<>();
+ tokenParams.put(DS_USERID, String.valueOf(DS_DEFAULT_USERID));
+
+ LocalDateTime now = LocalDateTime.now();
+ LocalDateTime tomorrow = now.plusDays(1);
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(DS_EXPIRE_TIME_FORMAT);
+ String expireTime = tomorrow.format(formatter);
+ tokenParams.put(DS_EXPIRE_TIME, expireTime);
+
+ Map<String, String> cookies = new HashMap<>();
+ cookies.put(DS_COOKIE_SC_TYPE,
loginResponse.get(DS_RESPONSE_DATA)
+
.getAsJsonObject().get(DS_COOKIE_SC_TYPE).getAsString());
+ cookies.put(DS_COOKIE_SESSION_ID,
loginResponse.get(DS_RESPONSE_DATA)
+
.getAsJsonObject().get(DS_COOKIE_SESSION_ID).getAsString());
+
+ JsonObject tokenGenResponse = executeHttpRequest(tokenGenUrl,
tokenParams, cookies);
+
+ String accessTokenUrl = DS_DEFAULT_SERVICE_URL + DS_TOKEN_URL;
+ tokenParams.put(DS_RESPONSE_TOKEN,
tokenGenResponse.get(DS_RESPONSE_DATA).getAsString());
+ JsonObject result = executeHttpRequest(accessTokenUrl,
tokenParams, cookies);
+ String token =
result.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_RESPONSE_TOKEN).getAsString();
+ DS_LOG.info("login and generate token success, token: {}",
token);
+ return token;
+ }
+ return null;
+ } catch (Exception e) {
+ DS_LOG.error("login and generate token fail: ", e);
+ throw new DolphinScheduleException(String.format("login and
generate token fail: %s", e.getMessage()));
+ }
+ }
+
+ private static JsonObject executeHttpRequest(String url, Map<String,
String> queryParams,
+ Map<String, String> cookies) throws IOException {
+ OkHttpClient client = new OkHttpClient();
+
+ // Build query parameters
+ HttpUrl.Builder urlBuilder =
Objects.requireNonNull(HttpUrl.parse(url)).newBuilder();
+ for (Map.Entry<String, String> entry : queryParams.entrySet()) {
+ urlBuilder.addQueryParameter(entry.getKey(), entry.getValue());
+ }
+ HttpUrl httpUrl = urlBuilder.build();
+
+ // Build the request
+ Request.Builder requestBuilder = new Request.Builder()
+ .url(httpUrl);
+
+ // Add cookies to the request
+ if (cookies != null && !cookies.isEmpty()) {
+ String cookieHeader = cookies.entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.joining("; "));
+ requestBuilder.header(DS_COOKIE, cookieHeader);
+ }
+
+ RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE),
"");
+ requestBuilder.post(body);
+
+ Request request = requestBuilder.build();
+
+ // Execute the request and parse the response
+ try (Response response = client.newCall(request).execute()) {
+ if (response.isSuccessful() && response.body() != null) {
+ String responseBody = response.body().string();
+ return JsonParser.parseString(responseBody).getAsJsonObject();
+ } else {
+ DS_LOG.error("Unexpected http response error: {}", response);
+ throw new DolphinScheduleException("Unexpected http response
error " + response);
+ }
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
new file mode 100644
index 0000000000..f95a5268ee
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.Timeout;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.ComponentScan;
+
+import javax.annotation.Resource;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+@SpringBootTest(classes = DolphinScheduleEngineTest.class)
+@ComponentScan(basePackages = "org.apache.inlong.manager")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class DolphinScheduleEngineTest extends DolphinScheduleContainerTestEnv
{
+
+ @Resource
+ private DolphinScheduleEngine dolphinScheduleEngine;
+
+ @BeforeAll
+ public void beforeAll() {
+ dolphinSchedulerContainer.setPortBindings(Arrays.asList("12345:12345",
"25333:25333"));
+ dolphinSchedulerContainer.start();
+ assertTrue(dolphinSchedulerContainer.isRunning(), "DolphinScheduler
container should be running");
+
+ String token = accessToken();
+ dolphinScheduleEngine.setToken(token);
+ dolphinScheduleEngine.start();
+ }
+
+ @AfterAll
+ public void afterAll() {
+ dolphinScheduleEngine.stop();
+ if (dolphinSchedulerContainer != null) {
+ dolphinSchedulerContainer.stop();
+ }
+ }
+
+ @Test
+ @Order(1)
+ @Timeout(30)
+ public void testRegisterScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ testRegister(scheduleInfo);
+
+ // 2. test for cron schedule
+ scheduleInfo = genDefaultCronScheduleInfo();
+ testRegister(scheduleInfo);
+ }
+
+ private void testRegister(ScheduleInfo scheduleInfo) {
+ // register schedule info
+ dolphinScheduleEngine.handleRegister(scheduleInfo);
+ assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+ }
+
+ @Test
+ @Order(2)
+ @Timeout(30)
+ public void testUnRegisterScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ testUnRegister(scheduleInfo);
+
+ // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+ scheduleInfo = genDefaultCronScheduleInfo();
+ testUnRegister(scheduleInfo);
+ }
+
+ private void testUnRegister(ScheduleInfo scheduleInfo) {
+ // register schedule info
+ dolphinScheduleEngine.handleRegister(scheduleInfo);
+ assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+
+ // Un-register schedule info
+
dolphinScheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId());
+ assertEquals(0, dolphinScheduleEngine.getScheduledProcessMap().size());
+ }
+
+ @Test
+ @Order(3)
+ @Timeout(30)
+ public void testUpdateScheduleInfo() {
+ // 1. test for normal schedule
+ ScheduleInfo scheduleInfo = genDefaultScheduleInfo();
+ testRegister(scheduleInfo);
+
+ // 2. test for cron schedule, gen cron schedule info, */2 * * * * ?
+ scheduleInfo = genDefaultCronScheduleInfo();
+ testUpdate(scheduleInfo);
+ }
+
+ private void testUpdate(ScheduleInfo scheduleInfo) {
+ // register schedule info
+ dolphinScheduleEngine.handleUpdate(scheduleInfo);
+ assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size());
+ }
+}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
new file mode 100644
index 0000000000..a2f6d97e0c
--- /dev/null
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.schedule.dolphinscheduler;
+
+public class DolphinSchedulerContainerEnvConstants {
+
+ // DS env image related constants
+ protected static final String DS_IMAGE_NAME =
"apache/dolphinscheduler-standalone-server";
+ protected static final String DS_IMAGE_TAG = "3.2.2";
+ protected static final String INTER_CONTAINER_DS_ALIAS =
"dolphinscheduler";
+
+ // DS env url related constants
+ protected static final String DS_DEFAULT_SERVICE_URL =
"http://127.0.0.1:12345/dolphinscheduler";
+ protected static final String DS_LOGIN_URL = "/login";
+ protected static final String DS_TOKEN_URL = "/access-tokens";
+ protected static final String DS_TOKEN_GEN_URL = "/generate";
+
+ // DS env api params related constants
+ protected static final String DS_USERNAME = "userName";
+ protected static final String DS_PASSWORD = "userPassword";
+ protected static final String DS_USERID = "userId";
+ protected static final String DS_COOKIE = "Cookie";
+ protected static final String DS_COOKIE_SC_TYPE = "securityConfigType";
+ protected static final String DS_COOKIE_SESSION_ID = "sessionId";
+ protected static final String DS_EXPIRE_TIME = "expireTime";
+ protected static final String DS_EXPIRE_TIME_FORMAT = "yyyy-MM-dd
HH:mm:ss";
+
+ // DS env token related constants
+ protected static final String DS_RESPONSE_TOKEN = "token";
+
+ // DS env default admin user info
+ protected static final String DS_DEFAULT_USERNAME = "admin";
+ protected static final String DS_DEFAULT_PASSWORD = "dolphinscheduler123";
+ protected static final Integer DS_DEFAULT_USERID = 1;
+
+}
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 2bad5f801f..e05c66a674 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -103,11 +103,13 @@ agent.install.temp.path=inlong/agent-installer-temp/
# The primary key id of the default agent module used
default.module.id=1
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
+# Dirty log
dirty.log.clean.enabled=false
dirty.log.clean.interval.minutes=5
dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 040c868bcf..7441ea55e6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -95,11 +95,12 @@ group.deleted.enabled=false
# Tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=127.0.0.1
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
-
+# Dirty log
dirty.log.clean.enabled=false
dirty.log.clean.interval.minutes=5
dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 393eef6b05..f0e42182c5 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -96,11 +96,12 @@ group.deleted.enabled=false
# Tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=127.0.0.1
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
-
+# Dirty log
dirty.log.clean.enabled=false
dirty.log.clean.interval.minutes=5
dirty.dirty.retention.minutes=10
-dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
\ No newline at end of file
+dirty.dirty.db.table=inlong_iceberg::dirty_data_achive_iceberg
+
+# DolphinScheduler related config
+schedule.engine.dolphinscheduler.url=http://127.0.0.1:12345/dolphinscheduler
+schedule.engine.dolphinscheduler.token=default_token_value