This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e42f5ec276 [INLONG-9873][Manager] Support adding data add tasks for
file collection (#9874)
e42f5ec276 is described below
commit e42f5ec27688089eb50ae48695c8fe39684eefb7
Author: fuweng11 <[email protected]>
AuthorDate: Thu Mar 28 21:42:59 2024 +0800
[INLONG-9873][Manager] Support adding data add tasks for file collection
(#9874)
* [INLONG-9873][Manager] Support adding supplementary recording tasks for
file collection
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix code style
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
* [INLONG-9873][Manager] Fix error
---
.../manager/client/api/InlongGroupContext.java | 2 +-
.../manager/dao/entity/StreamSourceEntity.java | 4 +-
.../dao/mapper/StreamSourceEntityMapper.java | 15 +++++---
.../resources/mappers/StreamSourceEntityMapper.xml | 37 +++++++++++-------
.../{SubSourceDTO.java => DataAddTaskDTO.java} | 16 ++++----
.../manager/pojo/source/DataAddTaskRequest.java | 44 ++++++++++++++++++++++
.../inlong/manager/pojo/source/SourceRequest.java | 4 +-
.../inlong/manager/pojo/source/StreamSource.java | 8 ++--
.../pojo/source/file/FileDataAddTaskRequest.java} | 27 ++++++++++---
.../manager/pojo/source/pulsar/PulsarSource.java | 3 ++
.../pojo/source/pulsar/PulsarSourceDTO.java | 3 ++
.../service/core/impl/AgentServiceImpl.java | 33 +++++++++++++---
.../source/AbstractSourceOperateListener.java | 2 +-
.../listener/source/SourceRestartListener.java | 2 +-
.../listener/source/SourceStopListener.java | 2 +-
.../service/source/AbstractSourceOperator.java | 7 ++++
.../service/source/StreamSourceOperator.java | 10 +++++
.../service/source/StreamSourceService.java | 10 +++++
.../service/source/StreamSourceServiceImpl.java | 15 +++++++-
.../service/source/file/FileSourceOperator.java | 43 +++++++++++++++++++--
.../manager/service/task/DataCleansingTask.java | 8 ++--
.../service/task/DeleteStreamSourceTask.java | 2 +-
.../service/core/impl/AgentServiceTest.java | 8 ++--
.../main/resources/h2/apache_inlong_manager.sql | 4 +-
.../manager-web/sql/apache_inlong_manager.sql | 4 +-
inlong-manager/manager-web/sql/changes-1.12.0.sql | 3 ++
.../web/controller/StreamSourceController.java | 7 ++++
.../src/main/resources/application-dev.properties | 21 +++++++----
.../src/main/resources/application-prod.properties | 21 +++++++----
.../src/main/resources/application-test.properties | 21 +++++++----
30 files changed, 294 insertions(+), 92 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 8600ecdafa..fe2c5ed04c 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -102,7 +102,7 @@ public class InlongGroupContext implements Serializable {
StreamSource source = entry.getValue();
// when template id is null it is considered as normal
source other than template source
// sub sources are filtered because they are already
collected in template source's sub source list
- if (source != null && source.getTemplateId() == null) {
+ if (source != null && source.getTaskMapId() == null) {
groupSources.add(source);
}
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 4b28525333..b577fd48da 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -37,7 +37,7 @@ public class StreamSourceEntity implements Serializable {
private String inlongStreamId;
private String sourceType;
private String sourceName;
- private Integer templateId;
+ private Integer taskMapId;
private String agentIp;
private String uuid;
@@ -74,7 +74,7 @@ public class StreamSourceEntity implements Serializable {
+ ", inlongStreamId='" + inlongStreamId + '\''
+ ", sourceType='" + sourceType + '\''
+ ", sourceName='" + sourceName + '\''
- + ", templateId=" + templateId
+ + ", templateId=" + taskMapId
+ ", agentIp='" + agentIp + '\''
+ ", uuid='" + uuid + '\''
+ ", dataNodeName='" + dataNodeName + '\''
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ca984fb04b..53d28bbbab 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -43,13 +43,13 @@ public interface StreamSourceEntityMapper {
StreamSourceEntity selectForAgentTask(Integer id);
/**
- * Select one sub source by template id and agent ip.
+ * Select one data add task by task map id and agent ip.
*
- * @param templateId template id
+ * @param taskMapId template id
* @param agentIp agent ip
* @return stream source info
*/
- StreamSourceEntity selectOneByTemplatedIdAndAgentIp(@Param("templateId")
Integer templateId,
+ StreamSourceEntity selectOneByTaskMapIdAndAgentIp(@Param("taskMapId")
Integer taskMapId,
@Param("agentIp") String agentIp);
/**
@@ -111,9 +111,9 @@ public interface StreamSourceEntityMapper {
List<StreamSourceEntity> selectByGroupIds(@Param("groupIdList")
List<String> groupIdList);
/**
- * Select all sub sources by template id
+ * Select all data add task by task map id
*/
- List<StreamSourceEntity> selectByTemplateId(@Param("templateId") Integer
templateId);
+ List<StreamSourceEntity> selectByTaskMapId(@Param("taskMapId") Integer
taskMapId);
/**
* Get the distinct source type from the given groupId and streamId
@@ -190,6 +190,11 @@ public interface StreamSourceEntityMapper {
*/
void updateStatusByDeleted();
+ /**
+ * Logic delete the data add task by modifiy time
+ */
+ void logicalDeleteByTimeout(@Param("retentionDays") Integer retentionDays);
+
int logicalDeleteByRelatedId(@Param("groupId") String groupId,
@Param("streamId") String streamId,
@Param("status") Integer status);
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 59d97d8a7f..c1101ed2f6 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -26,7 +26,7 @@
<result column="inlong_stream_id" jdbcType="VARCHAR"
property="inlongStreamId"/>
<result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
<result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
- <result column="template_id" jdbcType="INTEGER" property="templateId"/>
+ <result column="task_map_id" jdbcType="INTEGER" property="taskMapId"/>
<result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
<result column="uuid" jdbcType="VARCHAR" property="uuid"/>
<result column="data_node_name" jdbcType="VARCHAR"
property="dataNodeName"/>
@@ -47,7 +47,7 @@
<result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, inlong_stream_id, source_type, source_name,
template_id, agent_ip, uuid,
+ id, inlong_group_id, inlong_stream_id, source_type, source_name,
task_map_id, agent_ip, uuid,
data_node_name, inlong_cluster_name, inlong_cluster_node_group,
serialization_type, snapshot, report_time,
data_time_zone, ext_params, version, status, previous_status,
is_deleted, creator, modifier, create_time, modify_time
</sql>
@@ -55,13 +55,13 @@
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
insert into stream_source (inlong_group_id, inlong_stream_id,
- source_type, source_name, template_id,
agent_ip,
+ source_type, source_name, task_map_id,
agent_ip,
uuid, data_node_name, inlong_cluster_name,
inlong_cluster_node_group,
serialization_type, snapshot, report_time,
data_time_zone, ext_params, status,
previous_status, creator, modifier)
values (#{inlongGroupId,jdbcType=VARCHAR},
#{inlongStreamId,jdbcType=VARCHAR},
- #{sourceType,jdbcType=VARCHAR},
#{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER},
+ #{sourceType,jdbcType=VARCHAR},
#{sourceName,jdbcType=VARCHAR}, #{taskMapId,jdbcType=INTEGER},
#{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
#{dataNodeName,jdbcType=VARCHAR},
#{inlongClusterName,jdbcType=VARCHAR},
#{inlongClusterNodeGroup,jdbcType=VARCHAR},
#{serializationType,jdbcType=VARCHAR},
#{snapshot,jdbcType=LONGVARCHAR},#{modifyTime,jdbcType=TIMESTAMP},
@@ -76,11 +76,11 @@
where id = #{id,jdbcType=INTEGER}
and is_deleted = 0
</select>
- <select id="selectByTemplateId"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByTaskMapId"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
- where template_id = #{templateId,jdbcType=INTEGER}
+ where task_map_id = #{taskMapId,jdbcType=INTEGER}
and is_deleted = 0
</select>
<select id="selectByIdForUpdate"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -98,11 +98,11 @@
where id = #{id,jdbcType=INTEGER}
for update
</select>
- <select id="selectOneByTemplatedIdAndAgentIp"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectOneByTaskMapIdAndAgentIp"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
- where template_id = #{templateId,jdbcType=INTEGER}
+ where task_map_id = #{taskMapId,jdbcType=INTEGER}
and agent_ip = #{agentIp, jdbcType=VARCHAR}
and is_deleted = 0
limit 1
@@ -159,7 +159,7 @@
#{status}
</foreach>
</if>
- and template_id is NULL
+ and task_map_id is NULL
</where>
<choose>
<when test="request.orderField != null and request.orderField !=
'' and request.orderType != null and request.orderType != ''">
@@ -271,7 +271,7 @@
</foreach>
</if>
and agent_ip is NULL
- and template_id is NULL
+ and task_map_id is NULL
and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
</where>
</select>
@@ -303,7 +303,7 @@
#{item}
</foreach>
</if>
- and template_id is NULL
+ and task_map_id is NULL
</where>
</select>
<select id="selectSourceType" resultType="java.lang.String">
@@ -549,7 +549,18 @@
and status not in (99, 201, 301)
</where>
</update>
-
+ <update id="logicalDeleteByTimeout">
+ update stream_source
+ <set>
+ is_deleted = id,
+ status = 99
+ </set>
+ <where>
+ is_deleted = 0
+ and task_map_id is not null
+ and modify_time <= DATE_ADD(NOW(), INTERVAL -#{retentionDays,
jdbcType=INTEGER} DAY)
+ </where>
+ </update>
<update id="logicalDeleteByRelatedId">
update stream_source
<set>
@@ -586,7 +597,7 @@
</set>
where is_deleted = 0
and agent_ip = #{agentIp, jdbcType=VARCHAR}
- and template_id is not null
+ and task_map_id is not null
<if test="targetStatus != null">
and status = #{targetStatus, jdbcType=INTEGER}
</if>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
similarity index 79%
rename from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
rename to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
index 50d46d6e8a..544627db34 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SubSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskDTO.java
@@ -30,29 +30,29 @@ import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull;
/**
- * Sub source information data per agent
+ * Data add task information
*/
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
-public class SubSourceDTO {
+public class DataAddTaskDTO {
@ApiModelProperty("stream source id")
private Integer id;
- @ApiModelProperty("Template source id this sub source belongs to")
- private Integer templateId;
+ @ApiModelProperty("Main source id this data add task belongs to")
+ private Integer taskMapId;
- @ApiModelProperty("Agent ip of sub source")
+ @ApiModelProperty("Agent ip of data add task")
private String agentIp;
- @ApiModelProperty("Status of sub source")
+ @ApiModelProperty("Status of data add task")
private Integer status;
- public static SubSourceDTO getFromJson(@NotNull String extParams) {
+ public static DataAddTaskDTO getFromJson(@NotNull String extParams) {
try {
- return JsonUtils.parseObject(extParams, SubSourceDTO.class);
+ return JsonUtils.parseObject(extParams, DataAddTaskDTO.class);
} catch (Exception e) {
throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
new file mode 100644
index 0000000000..521551825a
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.hibernate.validator.constraints.Length;
+
+import javax.validation.constraints.NotBlank;
+
+/**
+ * Data add task information
+ */
+@Data
+@ApiModel("Data add task request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property =
"sourceType")
+public class DataAddTaskRequest {
+
+ @ApiModelProperty(value = "Source ID")
+ private Integer sourceId;
+
+ @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
+ @NotBlank(message = "sourceType cannot be blank")
+ @Length(min = 1, max = 20, message = "length must be between 1 and 20")
+ private String sourceType;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 0bcfade77b..6a31f9039f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -118,7 +118,7 @@ public class SourceRequest {
private Map<String, Object> properties = new LinkedHashMap<>();
@JsonIgnore
- @ApiModelProperty("Sub source information of existing agents")
- private List<SubSourceDTO> subSourceList;
+ @ApiModelProperty("Data add task information of existing agents")
+ private List<DataAddTaskDTO> dataAddTaskList;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
index 9793189d26..cc56ecb6a7 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/StreamSource.java
@@ -111,11 +111,11 @@ public abstract class StreamSource extends StreamNode {
@ApiModelProperty("Properties for source")
private Map<String, Object> properties = new LinkedHashMap<>();
- @ApiModelProperty("Null if not a sub source")
- private Integer templateId;
+ @ApiModelProperty("Null if not a data add task")
+ private Integer taskMapId;
- @ApiModelProperty("Sub source information of existing agents")
- private List<SubSourceDTO> subSourceList;
+ @ApiModelProperty("Data add task information of existing agents")
+ private List<DataAddTaskDTO> dataAddTaskList;
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value, true as default")
private Boolean ignoreParseError;
diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
similarity index 51%
copy from inlong-manager/manager-web/sql/changes-1.12.0.sql
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
index 82e8af66e5..bcf292c1f3 100644
--- a/inlong-manager/manager-web/sql/changes-1.12.0.sql
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java
@@ -15,14 +15,29 @@
* limitations under the License.
*/
--- This is the SQL change file from version 1.9.0 to the current version
1.10.0.
--- When upgrading to version 1.10.0, please execute those SQLs in the DB (such
as MySQL) used by the Manager module.
+package org.apache.inlong.manager.pojo.source.file;
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
-USE `apache_inlong_manager`;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
-ALTER TABLE `stream_source` ADD COLUMN `data_time_zone` varchar(256) DEFAULT
NULL COMMENT 'Data time zone';
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.FILE)
+@ApiModel(value = "File data add task request")
+public class FileDataAddTaskRequest extends DataAddTaskRequest {
+ @ApiModelProperty("Start time")
+ private Long startTime;
+ @ApiModelProperty("End time")
+ private Long endTime;
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index cdab4d59cf..8c14d8118b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -88,6 +88,9 @@ public class PulsarSource extends StreamSource {
@Builder.Default
private String wrapType = MessageWrapType.INLONG_MSG_V0.getName();
+ @ApiModelProperty("Reset subscription time")
+ private Long resetTime;
+
public PulsarSource() {
this.setSourceType(SourceType.PULSAR);
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 6fc1751d72..6c0ba66208 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -80,6 +80,9 @@ public class PulsarSourceDTO {
@ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
+ @ApiModelProperty("Reset subscription time")
+ private Long resetTime;
+
@ApiModelProperty("Properties for Pulsar")
private Map<String, Object> properties;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 953df33e96..3fa86b02fe 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -136,10 +136,17 @@ public class AgentServiceImpl implements AgentService {
private Integer beforeSeconds;
@Value("${source.update.interval:60}")
private Integer updateTaskInterval;
- @Value("${source.cleansing.enabled:false}")
+ @Value("${source.clean.enabled:false}")
private Boolean sourceCleanEnabled;
- @Value("${source.cleansing.interval:600}")
+ @Value("${source.clean.interval.seconds:600}")
private Integer cleanInterval;
+ @Value("${add.task.clean.enabled:false}")
+ private Boolean dataAddTaskCleanEnabled;
+ @Value("${add.task.clean.interval.seconds:10}")
+ private Integer dataAddTaskCleanInterval;
+ @Value("${add.task.retention.days:7}")
+ private Integer retentionDays;
+
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
@@ -202,6 +209,22 @@ public class AgentServiceImpl implements AgentService {
}, 0, cleanInterval, TimeUnit.SECONDS);
LOGGER.info("clean task started successfully");
}
+ if (dataAddTaskCleanEnabled) {
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("scheduled-subSource-deleted-%d")
+ .setDaemon(true)
+ .build();
+ ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
+ executor.scheduleWithFixedDelay(() -> {
+ try {
+ sourceMapper.logicalDeleteByTimeout(retentionDays);
+ LOGGER.info("clean sub task successfully");
+ } catch (Throwable t) {
+ LOGGER.error("clean sub task error", t);
+ }
+ }, 0, dataAddTaskCleanInterval, TimeUnit.SECONDS);
+ LOGGER.info("clean sub task started successfully");
+ }
}
@Override
@@ -441,7 +464,7 @@ public class AgentServiceImpl implements AgentService {
/**
* Add subtasks to template tasks.
- * (Template task are agent_ip is null and template_id is null)
+ * (Template task are agent_ip is null and task_map_id is null)
*/
private void preProcessTemplateFileTask(TaskRequest taskRequest) {
List<Integer> needCopiedStatusList =
Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
@@ -463,7 +486,7 @@ public class AgentServiceImpl implements AgentService {
if (groupEntity != null &&
noNeedAddTask.contains(GroupStatus.forCode(groupEntity.getStatus()))) {
return;
}
- StreamSourceEntity subSource =
sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),
+ StreamSourceEntity subSource =
sourceMapper.selectOneByTaskMapIdAndAgentIp(sourceEntity.getId(),
agentIp);
if (subSource == null) {
InlongClusterNodeEntity clusterNodeEntity =
selectByIpAndCluster(agentClusterName, agentIp);
@@ -474,7 +497,7 @@ public class AgentServiceImpl implements AgentService {
CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
fileEntity.setSourceName(fileEntity.getSourceName() + "-"
+
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
- fileEntity.setTemplateId(sourceEntity.getId());
+ fileEntity.setTaskMapId(sourceEntity.getId());
fileEntity.setAgentIp(agentIp);
fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
// create new sub source task
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index cb955decd9..ccefd5cf10 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -106,7 +106,7 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
// template sources are filtered and processed in corresponding
subclass listeners
if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus ==
SourceStatus.SOURCE_STOP
|| sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
- ||
CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
+ ||
CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
return true;
} else if (sourceStatus == SourceStatus.SOURCE_FAILED ||
sourceStatus == SourceStatus.SOURCE_DISABLE) {
return false;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
index 7575d8c25a..238b6bc1bc 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
@@ -51,7 +51,7 @@ public class SourceRestartListener extends
AbstractSourceOperateListener {
public void operateStreamSource(SourceRequest sourceRequest, String
operator) {
// if a source has sub-sources, it is considered a template source.
// template sources do not need to be restarted, its sub-sources will
be processed in this method later.
- if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+ if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) {
return;
}
streamSourceService.restart(sourceRequest.getId(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
index e3636dcb89..e1328bf6e2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
@@ -51,7 +51,7 @@ public class SourceStopListener extends
AbstractSourceOperateListener {
public void operateStreamSource(SourceRequest sourceRequest, String
operator) {
// if a source has sub-sources, it is considered a template source.
// template sources do not need to be stopped, its sub-sources will be
processed in this method later.
- if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+ if (CollectionUtils.isNotEmpty(sourceRequest.getDataAddTaskList())) {
return;
}
streamSourceService.stop(sourceRequest.getId(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 00f85052fd..9428101c40 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -32,6 +32,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -338,4 +339,10 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
public void syncSourceFieldInfo(SourceRequest request, String operator) {
LOGGER.info("not support sync source field info for type ={}",
request.getSourceType());
}
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
+ public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
+ throw new BusinessException(String.format("not support data add task
for type =%s", request.getSourceType()));
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 5e7168879b..997f39b867 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -126,4 +127,13 @@ public interface StreamSourceOperator {
*/
void syncSourceFieldInfo(SourceRequest request, String operator);
+ /**
+ * Save the data add task info.
+ *
+ * @param request request of data add task
+ * @param operator name of operator
+ * @return source id after saving
+ */
+ Integer addDataAddTask(DataAddTaskRequest request, String operator);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 1bcb9f9966..0dd4decd28 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.source;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -229,4 +230,13 @@ public interface StreamSourceService {
return true;
}
+ /**
+ * Save the data add task information
+ *
+ * @param request Source request.
+ * @param operator Operator's name.
+ * @return source id after saving.
+ */
+ Integer addDataAddTask(DataAddTaskRequest request, String operator);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index a8a01224b8..2d3855b05e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -384,7 +385,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.expectNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- boolean isTemplateSource =
CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
+ boolean isTemplateSource =
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
// Check if it can be delete
InlongGroupEntity groupEntity =
groupMapper.selectByGroupId(entity.getInlongGroupId());
@@ -436,7 +437,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
String.format("InlongGroup does not exist with
InlongGroupId=%s", entity.getInlongGroupId()));
}
// check record status
- boolean isTemplateSource =
CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
+ boolean isTemplateSource =
CollectionUtils.isNotEmpty(sourceMapper.selectByTaskMapId(id));
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
// if source is frozen|failed|new, or if it is a template source or
auto push source, delete directly
@@ -629,4 +630,14 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
request.setInlongStreamId(entity.getInlongStreamId());
request.setSourceName(entity.getSourceName());
}
+
+ @Override
+ public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
+ LOGGER.info("begin to add data add task info: {}", request);
+ StreamSourceEntity entity =
sourceMapper.selectById(request.getSourceId());
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
+ int id = sourceOperator.addDataAddTask(request, operator);
+ LOGGER.info("success to add data add task info: {}", request);
+ return id;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 05b22ae51c..5d4329c7ff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -23,9 +23,11 @@ import
org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.SubSourceDTO;
+import org.apache.inlong.manager.pojo.source.file.FileDataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.file.FileSource;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceRequest;
@@ -33,8 +35,13 @@ import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.stream.Collectors;
@@ -45,6 +52,8 @@ import java.util.stream.Collectors;
@Service
public class FileSourceOperator extends AbstractSourceOperator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileSourceOperator.class);
+
@Autowired
private ObjectMapper objectMapper;
@@ -88,14 +97,40 @@ public class FileSourceOperator extends
AbstractSourceOperator {
List<StreamField> sourceFields = super.getSourceFields(entity.getId());
source.setFieldList(sourceFields);
- List<StreamSourceEntity> subSourceList =
sourceMapper.selectByTemplateId(entity.getId());
- source.setSubSourceList(subSourceList.stream().map(subEntity ->
SubSourceDTO.builder()
+ List<StreamSourceEntity> dataAddTaskList =
sourceMapper.selectByTaskMapId(entity.getId());
+ source.setDataAddTaskList(dataAddTaskList.stream().map(subEntity ->
DataAddTaskDTO.builder()
.id(subEntity.getId())
- .templateId(entity.getId())
+ .taskMapId(entity.getId())
.agentIp(subEntity.getAgentIp())
.status(subEntity.getStatus()).build())
.collect(Collectors.toList()));
return source;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
+ public Integer addDataAddTask(DataAddTaskRequest request, String operator)
{
+ FileDataAddTaskRequest sourceRequest = (FileDataAddTaskRequest)
request;
+ StreamSourceEntity sourceEntity =
sourceMapper.selectById(request.getSourceId());
+ try {
+ List<StreamSourceEntity> dataAddTaskList =
sourceMapper.selectByTaskMapId(sourceEntity.getId());
+ int dataAddTaskSize = CollectionUtils.isNotEmpty(dataAddTaskList)
? dataAddTaskList.size() : 0;
+ FileSourceDTO dto =
FileSourceDTO.getFromJson(sourceEntity.getExtParams());
+ dto.setStartTime(sourceRequest.getStartTime());
+ dto.setEndTime(sourceRequest.getEndTime());
+ dto.setRetry(true);
+ StreamSourceEntity dataAddTaskEntity =
+ CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
+ dataAddTaskEntity.setId(null);
+ dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-"
+ (dataAddTaskSize + 1));
+
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
+ return sourceMapper.insert(dataAddTaskEntity);
+ } catch (Exception e) {
+ LOGGER.error("serialize extParams of File SourceDTO failure: ", e);
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of File SourceDTO
failure: %s", e.getMessage()));
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
index 871700c5b3..2758c80ce8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DataCleansingTask.java
@@ -59,13 +59,13 @@ public class DataCleansingTask extends TimerTask implements
InitializingBean {
*/
private static final int INITIAL_DELAY = 60;
- @Value("${data.cleansing.enabled:false}")
+ @Value("${data.clean.enabled:false}")
private Boolean enabled;
- @Value("${data.cleansing.interval.seconds:1800}")
+ @Value("${data.clean.interval.seconds:1800}")
private Integer interval;
- @Value("${data.cleansing.before.days:10}")
+ @Value("${data.clean.before.days:10}")
private Integer before;
- @Value("${data.cleansing.batchSize:100}")
+ @Value("${data.clean.batchSize:100}")
private Integer batchSize;
@Autowired
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
index 9d3a83dff9..5e4b4e724e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java
@@ -56,7 +56,7 @@ public class DeleteStreamSourceTask extends TimerTask
implements InitializingBea
@Value("${group.deleted.enabled:false}")
private Boolean enabled;
- @Value("${group.deleted.batchSize:100}")
+ @Value("${group.deleted.batch.size:100}")
private Integer batchSize;
@Value("${group.deleted.latest.hours:10}")
private Integer latestHours;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 70034803e8..41acbdfd86 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -141,7 +141,7 @@ class AgentServiceTest extends ServiceBaseTest {
public void suspendSource(String groupId, String streamId) {
List<StreamSource> sources = sourceService.listSource(groupId,
streamId);
sources.stream()
- .filter(source -> source.getTemplateId() != null)
+ .filter(source -> source.getTaskMapId() != null)
.forEach(source -> sourceService.stop(source.getId(),
GLOBAL_OPERATOR));
groupMapper.updateStatus(groupId,
GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR);
streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR);
@@ -153,7 +153,7 @@ class AgentServiceTest extends ServiceBaseTest {
public void restartSource(String groupId, String streamId) {
List<StreamSource> sources = sourceService.listSource(groupId,
streamId);
sources.stream()
- .filter(source -> source.getTemplateId() != null)
+ .filter(source -> source.getTaskMapId() != null)
.forEach(source -> sourceService.restart(source.getId(),
GLOBAL_OPERATOR));
groupMapper.updateStatus(groupId,
GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.RESTARTED.getCode(), GLOBAL_OPERATOR);
@@ -230,7 +230,7 @@ class AgentServiceTest extends ServiceBaseTest {
agent.pullTask(); // report last success status
final int sourceId = sourceService.listSource(groupStream.getLeft(),
groupStream.getRight()).stream()
- .filter(source -> source.getTemplateId() != null)
+ .filter(source -> source.getTaskMapId() != null)
.findAny()
.get()
.getId();
@@ -256,7 +256,7 @@ class AgentServiceTest extends ServiceBaseTest {
// update group to config success
final String groupId = sourceService.listSource(groupStream.getLeft(),
groupStream.getRight()).stream()
- .filter(source -> source.getTemplateId() != null)
+ .filter(source -> source.getTaskMapId() != null)
.findAny()
.get()
.getInlongGroupId();
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index edd9abf88d..1a2d28ad2f 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -331,7 +331,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT
'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source
type, including: FILE, DB, etc',
- `template_id` int(11) DEFAULT NULL COMMENT 'Id of
the template task this agent belongs to',
+ `task_map_id` int(11) DEFAULT NULL COMMENT 'Id of
the task this agent belongs to',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of
the agent running the task, NULL if this is a template task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid
of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node
name, which links to data_node table',
@@ -354,7 +354,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`,
`source_name`, `is_deleted`),
INDEX `source_status_index` (`status`, `is_deleted`),
INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
- INDEX `source_template_id_index` (`template_id`)
+ INDEX `source_task_map_id_index` (`task_map_id`)
);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 025b53190e..5451516614 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -351,7 +351,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT
'source_name',
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source
type, including: FILE, DB, etc',
- `template_id` int(11) DEFAULT NULL COMMENT 'Id of
the template task this agent belongs to',
+ `task_map_id` int(11) DEFAULT NULL COMMENT 'Id of
the task this agent belongs to',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of
the agent running the task, NULL if this is a template task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid
of the agent running the task',
`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node
name, which links to data_node table',
@@ -374,7 +374,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`,
`source_name`, `is_deleted`),
INDEX `source_status_index` (`status`, `is_deleted`),
INDEX `source_agent_ip_index` (`agent_ip`, `is_deleted`),
- INDEX `source_template_id_index` (`template_id`)
+ INDEX `source_task_map_id_index` (`task_map_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
diff --git a/inlong-manager/manager-web/sql/changes-1.12.0.sql
b/inlong-manager/manager-web/sql/changes-1.12.0.sql
index 82e8af66e5..a92011164f 100644
--- a/inlong-manager/manager-web/sql/changes-1.12.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.12.0.sql
@@ -24,5 +24,8 @@ SET FOREIGN_KEY_CHECKS = 0;
USE `apache_inlong_manager`;
ALTER TABLE `stream_source` ADD COLUMN `data_time_zone` varchar(256) DEFAULT
NULL COMMENT 'Data time zone';
+DROP INDEX `source_template_id_index` ON `stream_source`;
+CREATE INDEX source_task_map_id_index ON `stream_source` (`task_map_id`);
+ALTER TABLE `stream_source` CHANGE template_id task_map_id int(11) DEFAULT
NULL COMMENT 'Id of the task this agent belongs to';
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index f0cbbd56da..8e7645f993 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -118,4 +119,10 @@ public class StreamSourceController {
sourceService.forceDelete(inlongGroupId, inlongStreamId,
LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/source/addDataAddTask", method =
RequestMethod.POST)
+ @ApiOperation(value = "Add supplementary recording task for stream source")
+ public Response<Integer> addSub(@RequestBody DataAddTaskRequest request) {
+ return Response.success(sourceService.addDataAddTask(request,
LoginUserUtils.getLoginUser().getName()));
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 0bcebdc06e..8a9032d5ec 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -73,15 +73,15 @@ audit.ck.username=default
# ClickHouse password
audit.ck.password=
-# Database cleansing
+# Database clean
# If turned on, logically deleted data will be collected and permanently
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
# The interval (in seconds) between the end of one execution and the start of
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
# Select the data whose latest modify time is some days before, default is 10
days
-data.cleansing.before.days=10
+data.clean.before.days=10
# The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
@@ -97,14 +97,19 @@ source.update.enabled=false
source.update.before.seconds=60
source.update.interval=60
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
# If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
# Select the InlongGroupIds whose latest modification time is within how many
hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false
diff --git
a/inlong-manager/manager-web/src/main/resources/application-prod.properties
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index a9c55b39b3..835822bf84 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -72,15 +72,15 @@ audit.ck.username=default
# ClickHouse password
audit.ck.password=
-# Database cleansing
+# Database clean
# If turned on, logically deleted data will be collected and permanently
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
# The interval (in seconds) between the end of one execution and the start of
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
# Select the data whose latest modify time is some days before, default is 10
days
-data.cleansing.before.days=10
+data.clean.before.days=10
# The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
@@ -96,14 +96,19 @@ source.update.enabled=false
source.update.before.seconds=60
source.update.interval=60
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
# If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
# Select the InlongGroupIds whose latest modification time is within how many
hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false
diff --git
a/inlong-manager/manager-web/src/main/resources/application-test.properties
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 0bcebdc06e..8a9032d5ec 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -73,15 +73,15 @@ audit.ck.username=default
# ClickHouse password
audit.ck.password=
-# Database cleansing
+# Database clean
# If turned on, logically deleted data will be collected and permanently
deleted periodically
-data.cleansing.enabled=false
+data.clean.enabled=false
# The interval (in seconds) between the end of one execution and the start of
the next, default is 1800s (0.5 hour)
-data.cleansing.interval.seconds=1800
+data.clean.interval.seconds=1800
# Select the data whose latest modify time is some days before, default is 10
days
-data.cleansing.before.days=10
+data.clean.before.days=10
# The maximum size of data to be deleted in batch, default is 100
-data.cleansing.batchSize=100
+data.clean.batchSize=100
# Whether to use ZooKeeper to manage the Sort task config, default is false,
which means not using ZooKeeper
sort.enable.zookeeper=false
@@ -97,14 +97,19 @@ source.update.enabled=false
source.update.before.seconds=60
source.update.interval=60
+# If turned on, regularly clear expired data add tasks
+add.task.clean.enabled=false
+add.task.clean.interval.seconds=10
+add.task.retention.days=7
+
# If turned on, tasks in the incorrect state are periodically deleted
-source.cleansing.enabled=false
-source.cleansing.interval=600
+source.clean.enabled=false
+source.clean.interval.seconds=600
# Select the InlongGroupIds whose latest modification time is within how many
hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those
InlongGroupIds will be used to delete the related StreamSources.
-group.deleted.batchSize=100
+group.deleted.batch.size=100
# If turned on, the groups could be deleted periodically.
group.deleted.enabled=false