This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb55831cf0 [INLONG-11483][Manager] Support multiple scedule engine
selection (#11484)
eb55831cf0 is described below
commit eb55831cf0408d4c30a12199a4d6f060d74cc670
Author: AloysZhang <[email protected]>
AuthorDate: Tue Nov 12 17:00:41 2024 +0800
[INLONG-11483][Manager] Support multiple scedule engine selection (#11484)
Co-authored-by: Aloys Zhang <[email protected]>
---
.../inlong/manager/dao/entity/ScheduleEntity.java | 2 ++
.../resources/mappers/ScheduleEntityMapper.xml | 26 ++++++++++++---------
.../manager/dao/mapper/ScheduleEntityTest.java | 3 +++
.../manager/pojo/group/InlongGroupRequest.java | 5 ++++
.../inlong/manager/pojo/schedule/ScheduleInfo.java | 10 +++++---
.../manager/pojo/schedule/ScheduleInfoRequest.java | 10 +++++---
.../manager/schedule/ScheduleClientFactory.java | 6 +----
.../service/schedule/ScheduleOperatorImpl.java | 8 +++----
.../main/resources/h2/apache_inlong_manager.sql | 1 +
.../manager-web/sql/apache_inlong_manager.sql | 1 +
inlong-manager/manager-web/sql/changes-2.1.0.sql | 27 ++++++++++++++++++++++
11 files changed, 73 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
index 6d301703fc..a9798c91f1 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ScheduleEntity.java
@@ -33,6 +33,8 @@ public class ScheduleEntity implements Serializable {
private String inlongGroupId;
// schedule type, support [normal, crontab], 0 for normal and 1 for crontab
private Integer scheduleType;
+ // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+ private String scheduleEngine;
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
private String scheduleUnit;
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
index d719aa8988..33d25ad78a 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/ScheduleEntityMapper.xml
@@ -22,6 +22,7 @@
<id column="id" jdbcType="INTEGER" property="id"/>
<result column="inlong_group_id" jdbcType="VARCHAR"
property="inlongGroupId"/>
<result column="schedule_type" jdbcType="INTEGER"
property="scheduleType"/>
+ <result column="schedule_engine" jdbcType="VARCHAR"
property="scheduleEngine"/>
<result column="schedule_unit" jdbcType="VARCHAR"
property="scheduleUnit"/>
<result column="schedule_interval" jdbcType="INTEGER"
property="scheduleInterval"/>
<result column="start_time" jdbcType="TIMESTAMP" property="startTime"/>
@@ -42,25 +43,25 @@
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, schedule_type, schedule_unit, schedule_interval,
start_time,
+ id, inlong_group_id, schedule_type, schedule_engine, schedule_unit,
schedule_interval, start_time,
end_time, delay_time, self_depend, task_parallelism,
crontab_expression,
status, previous_status, is_deleted, creator, modifier,
create_time, modify_time, version
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.ScheduleEntity">
- insert into schedule_config (id, inlong_group_id, schedule_type,
schedule_unit,
- schedule_interval, start_time, end_time,
delay_time,
- self_depend, task_parallelism,
crontab_expression,
+ insert into schedule_config (id, inlong_group_id, schedule_type,
schedule_engine,
+ schedule_unit, schedule_interval,
start_time, end_time,
+ delay_time, self_depend,
task_parallelism, crontab_expression,
status, previous_status, creator,
modifier)
values (#{id, jdbcType=INTEGER}, #{inlongGroupId, jdbcType=VARCHAR},
- #{scheduleType, jdbcType=INTEGER}, #{scheduleUnit,
jdbcType=VARCHAR},
- #{scheduleInterval, jdbcType=INTEGER}, #{startTime,
jdbcType=TIMESTAMP},
- #{endTime, jdbcType=TIMESTAMP}, #{delayTime, jdbcType=INTEGER},
- #{selfDepend, jdbcType=INTEGER}, #{taskParallelism,
jdbcType=INTEGER},
- #{crontabExpression, jdbcType=VARCHAR},
#{status,jdbcType=INTEGER},
- #{previousStatus,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR},
- #{modifier,jdbcType=VARCHAR})
+ #{scheduleType, jdbcType=INTEGER}, #{scheduleEngine,
jdbcType=VARCHAR},
+ #{scheduleUnit, jdbcType=VARCHAR}, #{scheduleInterval,
jdbcType=INTEGER},
+ #{startTime, jdbcType=TIMESTAMP}, #{endTime,
jdbcType=TIMESTAMP},
+ #{delayTime, jdbcType=INTEGER}, #{selfDepend,
jdbcType=INTEGER},
+ #{taskParallelism, jdbcType=INTEGER}, #{crontabExpression,
jdbcType=VARCHAR},
+ #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+ #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer"
resultMap="BaseResultMap">
@@ -88,6 +89,9 @@
<if test="scheduleType != null">
schedule_type = #{scheduleType, jdbcType=INTEGER},
</if>
+ <if test="scheduleEngine != null">
+ schedule_engine = #{scheduleEngine, jdbcType=VARCHAR},
+ </if>
<if test="scheduleUnit !=null">
schedule_unit = #{scheduleUnit, jdbcType=VARCHAR},
</if>
diff --git
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
index ef4207bc84..6d2cdaa661 100644
---
a/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
+++
b/inlong-manager/manager-dao/src/test/java/org/apache/inlong/manager/dao/mapper/ScheduleEntityTest.java
@@ -32,6 +32,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
public static final String GROUP_ID_PREFIX = "test_group_";
public static final String USER = "admin";
public static final int SCHEDULE_TYPE = 0;
+ public static final String SCHEDULE_ENGINE = "Quartz";
public static final int SCHEDULE_TYPE_NEW = 1;
public static final String SCHEDULE_UNIT = "H";
public static final String SCHEDULE_UNIT_NEW = "D";
@@ -63,6 +64,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
ScheduleEntity entityQueried =
scheduleEntityMapper.selectByGroupId(scheduleEntity.getInlongGroupId());
Assertions.assertEquals(scheduleEntity.getInlongGroupId(),
entityQueried.getInlongGroupId());
Assertions.assertEquals(SCHEDULE_TYPE,
entityQueried.getScheduleType());
+ Assertions.assertEquals(SCHEDULE_ENGINE,
entityQueried.getScheduleEngine());
Assertions.assertEquals(SCHEDULE_UNIT,
entityQueried.getScheduleUnit());
Assertions.assertEquals(SCHEDULE_INTERVAL,
entityQueried.getScheduleInterval());
Assertions.assertEquals(DEFAULT_TIME, entityQueried.getStartTime());
@@ -105,6 +107,7 @@ public class ScheduleEntityTest extends DaoBaseTest {
ScheduleEntity entity = new ScheduleEntity();
entity.setInlongGroupId(GROUP_ID_PREFIX + System.currentTimeMillis());
entity.setScheduleType(SCHEDULE_TYPE);
+ entity.setScheduleEngine(SCHEDULE_ENGINE);
entity.setScheduleUnit(SCHEDULE_UNIT);
entity.setScheduleInterval(SCHEDULE_INTERVAL);
entity.setStartTime(DEFAULT_TIME);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 1749d1427c..194bf35454 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -135,6 +135,11 @@ public abstract class InlongGroupRequest extends
BaseInlongGroup {
@ApiModelProperty("Schedule type")
private Integer scheduleType;
+ // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+ @ApiModelProperty(value = "Schedule engine, support Quartz, Airflow and
DolphinScheduler")
+ @Length(min = 1, max = 20, message = "length must be between 1 and 20")
+ private String scheduleEngine;
+
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index bb4fb2ca41..b6527cf308 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -50,6 +50,10 @@ public class ScheduleInfo {
@ApiModelProperty("Schedule type")
private Integer scheduleType;
+ // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+ @ApiModelProperty("Schedule engine")
+ private String scheduleEngine;
+
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
@@ -91,6 +95,7 @@ public class ScheduleInfo {
ScheduleInfo that = (ScheduleInfo) o;
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
+ && Objects.equals(scheduleEngine, that.scheduleEngine)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime)
@@ -103,9 +108,8 @@ public class ScheduleInfo {
@Override
public int hashCode() {
- return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit,
scheduleInterval, startTime, endTime,
- delayTime,
- selfDepend, taskParallelism, crontabExpression, version);
+ return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine,
scheduleUnit, scheduleInterval, startTime,
+ endTime, delayTime, selfDepend, taskParallelism,
crontabExpression, version);
}
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index bf6fb298bf..882b490cf1 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -44,6 +44,10 @@ public class ScheduleInfoRequest {
@ApiModelProperty("Schedule type")
private Integer scheduleType;
+ // schedule engine type, support [Quartz, Airflow, DolphinScheduler]
+ @ApiModelProperty(value = "Schedule engine")
+ private String scheduleEngine;
+
// time unit for offline task schedule interval, support [month, week,
day, hour, minute, oneround]
// Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround
@ApiModelProperty("TimeUnit for schedule interval")
@@ -85,6 +89,7 @@ public class ScheduleInfoRequest {
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
return Objects.equals(inlongGroupId, that.inlongGroupId)
&& Objects.equals(scheduleType, that.scheduleType)
+ && Objects.equals(scheduleEngine, that.scheduleEngine)
&& Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
&& Objects.equals(startTime, that.startTime)
@@ -97,8 +102,7 @@ public class ScheduleInfoRequest {
@Override
public int hashCode() {
- return Objects.hash(id, inlongGroupId, scheduleType, scheduleUnit,
scheduleInterval, startTime, endTime,
- delayTime,
- selfDepend, taskParallelism, crontabExpression, version);
+ return Objects.hash(id, inlongGroupId, scheduleType, scheduleEngine,
scheduleUnit, scheduleInterval,
+ startTime, endTime, delayTime, selfDepend, taskParallelism,
crontabExpression, version);
}
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
index 13f87b3c45..26570fa36b 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleClientFactory.java
@@ -23,7 +23,6 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
@@ -34,13 +33,10 @@ public class ScheduleClientFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(ScheduleClientFactory.class);
- @Value("${inlong.schedule.engine:none}")
- private String scheduleEngineName;
-
@Autowired
List<ScheduleEngineClient> scheduleEngineClients;
- public ScheduleEngineClient getInstance() {
+ public ScheduleEngineClient getInstance(String scheduleEngineName) {
Optional<ScheduleEngineClient> optScheduleClient =
scheduleEngineClients.stream().filter(t ->
t.accept(scheduleEngineName)).findFirst();
if (!optScheduleClient.isPresent()) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 61f847f244..6ef899ab9e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -88,9 +88,9 @@ public class ScheduleOperatorImpl implements ScheduleOperator
{
}
}
- private ScheduleEngineClient getScheduleEngineClient() {
+ private ScheduleEngineClient getScheduleEngineClient(String
scheduleEngine) {
if (scheduleEngineClient == null) {
- scheduleEngineClient = scheduleClientFactory.getInstance();
+ scheduleEngineClient =
scheduleClientFactory.getInstance(scheduleEngine);
}
return scheduleEngineClient;
}
@@ -143,8 +143,8 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
* */
private Boolean registerToScheduleEngine(ScheduleInfo scheduleInfo, String
operator, boolean isUpdate) {
// update(un-register and then register) or register
- boolean res = isUpdate ? getScheduleEngineClient().update(scheduleInfo)
- : getScheduleEngineClient().register(scheduleInfo);
+ boolean res = isUpdate ?
getScheduleEngineClient(scheduleInfo.getScheduleEngine()).update(scheduleInfo)
+ :
getScheduleEngineClient(scheduleInfo.getScheduleEngine()).register(scheduleInfo);
// update status to REGISTERED
scheduleService.updateStatus(scheduleInfo.getInlongGroupId(),
REGISTERED, operator);
LOGGER.info("{} schedule info success for group {}",
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index bfaa28b1cd..1d92f14f46 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -992,6 +992,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
+ `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT
'Schedule engine, support Quartz, Airflow and DolphinScheduler',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit,
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 8697d3c0e7..58259c9047 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1045,6 +1045,7 @@ CREATE TABLE IF NOT EXISTS `schedule_config`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id,
undeleted ones cannot be repeated',
`schedule_type` int(4) NOT NULL DEFAULT '0' COMMENT
'Schedule type, 0 for normal, 1 for crontab',
+ `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz' COMMENT
'Schedule engine, support Quartz, Airflow and DolphinScheduler',
`schedule_unit` varchar(64) DEFAULT NULL COMMENT 'Schedule unit,
Y=year, M=month, W=week, D=day, H=hour, I=minute, O=oneround',
`schedule_interval` int(11) DEFAULT '1' COMMENT 'Schedule
interval',
`start_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
COMMENT 'Start time for schedule',
diff --git a/inlong-manager/manager-web/sql/changes-2.1.0.sql
b/inlong-manager/manager-web/sql/changes-2.1.0.sql
new file mode 100644
index 0000000000..0d4c984778
--- /dev/null
+++ b/inlong-manager/manager-web/sql/changes-2.1.0.sql
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is the SQL change file from version 1.4.0 to the current version 1.5.0.
+-- When upgrading to version 1.5.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+USE `apache_inlong_manager`;
+
+ALTER TABLE `schedule_config`
+ ADD COLUMN `schedule_engine` varchar(64) NOT NULL DEFAULT 'Quartz'
COMMENT 'Schedule engine, support Quartz, Airflow and DolphinScheduler';