emptyOVO opened a new pull request, #11468:
URL: https://github.com/apache/inlong/pull/11468
<!-- Prepare a Pull Request
Change the title of pull request refer to the following example:
[INLONG-XYZ][Component] Title of the pull request
-->
<!-- Specify the issue this pull request going to fix.
The following *XYZ* should be replaced by the actual [GitHub
Issue](https://github.com/apache/inlong/issues) number)-->
Fixes #11401
### Motivation
* add `dolphinscheduler` package in `org.apache.inlong.manager.schedule`
* add Client and Engine to DS, and Utils for operating open-API of DS
* add pojo class for DS interaction
* add UT, provide image env to test cases, mock DS for inlong to test
schedule ability
<!--Explain here the context, and why you're making that change. What is the
problem you're trying to solve.-->
### Modifications
Because DS officially does not provide SDK to call, only provides openAPI
call mode, so all scheduling behavior is based on http requests
1. when inlong enabled the DS schedule mode, a project in DS will be
initialized to handle processes
2. when inlong starts a offline schedule job for a inlong-group, it will
create a workflow process definition in project, scheduled by info, with a
script periodically sends requests to call back to inlong
3. when unregistered, it offline the process running on DS and delete it
Provides some disaster recovery logic
1. After inlong shutdown and restart in unexpected situations, the workflow
process data will recovered and continue work, prevent duplicate data
generation and data loss
2. The latest ScheduleInfo is enabled during repeated registration to avoid
data redundancy
3. `ConcurrentHashMap` to store schedule data ensure thread safety during
start, running or stop
How to use
1. Specify the url and token in the configuration file
```properties
# DolphinScheduler related config
inlong.schedule.dolphinscheduler.url=
inlong.schedule.dolphinscheduler.token=
```
`inlong manager` performs dependency injection at startup via the @Value
annotation
2. Configure the DolphinScheduleEngine when it is initialized
```java
DolphinScheduleEngine dolphinScheduleEngine = new
DolphinScheduleEngine(INLONG_DS_TEST_ADDRESS, INLONG_DS_TEST_PORT,
INLONG_DS_TEST_USERNAME, INLONG_DS_TEST_PASSWORD, DS_URL, DS_TOKEN);
```
<!--Describe the modifications you've done.-->
### Verifying this change


DS url,for example:`http://{ip}:{port}/dolphinscheduler`
* testRegisterScheduleInfo





the callback request was successfully sent to inlong, and the related tasks
are processed in flink
log info:
```java
2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.111 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296993905792
2024-11-05 21:43:03.154 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296993945728
2024-11-05 21:43:03.167 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.183 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S,
scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05
21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null,
crontabExpression=null, version=null)
2024-11-05 21:43:03.199 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
2024-11-05 21:43:03.203 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.204 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.209 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 -
Process definition exists, process definition id: 124296993945728, deleting...
2024-11-05 21:43:03.237 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296994033792
2024-11-05 21:43:03.262 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296994056320
2024-11-05 21:43:03.275 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.290 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1,
scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0,
endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null,
taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.306 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
```
* testUnregisterScheduleInfo

log info:
```java
2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.319 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 -
Process definition exists, process definition id: 124296994056320, deleting...
2024-11-05 21:43:03.356 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296994155648
2024-11-05 21:43:03.368 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296994165888
2024-11-05 21:43:03.382 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.397 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S,
scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05
21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null,
crontabExpression=null, version=null)
2024-11-05 21:43:03.412 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 -
Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.417 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 -
Deleting process definition, process definition id: 124296994165888
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 -
Process definition deleted
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 -
Un-registered dolphin schedule info for test-group
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.451 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296994252928
2024-11-05 21:43:03.460 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296994260096
2024-11-05 21:43:03.475 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.491 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1,
scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0,
endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null,
taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 -
Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.512 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 -
Deleting process definition, process definition id: 124296994260096
2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 -
Process definition deleted
2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 -
Un-registered dolphin schedule info for test-group
```
* testUpdateScheduleInfo


```java
2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.547 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296994352256
2024-11-05 21:43:03.554 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296994357376
2024-11-05 21:43:03.569 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.586 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S,
scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05
21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null,
crontabExpression=null, version=null)
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:212 -
Update dolphin schedule info for test-group
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 -
Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.606 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 -
Deleting process definition, process definition id: 124296994357376
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 -
Process definition deleted
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 -
Un-registered dolphin schedule info for test-group
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 -
Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.636 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 -
Checking process definition id uniqueness...
2024-11-05 21:43:03.641 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 -
Generate task code for process definition success, task code: 124296994448512
2024-11-05 21:43:03.648 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 -
Create process definition success, process definition code: 124296994453632
2024-11-05 21:43:03.663 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 -
Release process definition success, release status: ONLINE
2024-11-05 21:43:03.678 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 -
Create schedule for process definition success, schedule info:
ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1,
scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0,
endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null,
taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.695 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 -
Online schedule for process definition, status: true
```
*(Please pick either of the following options)*
- [ ] This change is a trivial rework/code cleanup without any test coverage.
- [x] This change is already covered by existing tests, such as:
*(please describe tests)*
- [ ] This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
### Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs
/ not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a follow-up
issue for adding the documentation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]