This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 17cb04edc9 [INLONG-11142][Manager] Fix the problem of data add task
not scheduled for cleaning (#11143)
17cb04edc9 is described below
commit 17cb04edc909e7525dfd6927b6eaecb733b5c584
Author: fuweng11 <[email protected]>
AuthorDate: Thu Sep 19 17:23:52 2024 +0800
[INLONG-11142][Manager] Fix the problem of data add task not scheduled for
cleaning (#11143)
---
.../manager/dao/mapper/StreamSourceEntityMapper.java | 2 ++
.../main/resources/mappers/StreamSourceEntityMapper.xml | 10 ++++++++++
.../inlong/manager/service/core/impl/AgentServiceImpl.java | 14 +++++++++++++-
.../manager/service/source/file/FileSourceOperator.java | 3 ++-
4 files changed, 27 insertions(+), 2 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 7340d182c3..a05f15dda7 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -67,6 +67,8 @@ public interface StreamSourceEntityMapper {
*/
int selectDataAddTaskCount(@Param("groupId") String groupId,
@Param("streamId") String streamId);
+ List<StreamSourceEntity> selectByByTimeout(@Param("retentionDays") Integer
retentionDays);
+
/**
* Paging query source list based on conditions
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 732a5cfd56..c68f20e167 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -133,6 +133,16 @@
and task_map_id is not NULL
</where>
</select>
+ <select id="selectByByTimeout"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and task_map_id is not null
+ and modify_time <= DATE_ADD(NOW(), INTERVAL -#{retentionDays,
jdbcType=INTEGER} DAY)
+ </where>
+ </select>
<select id="selectByCondition"
parameterType="org.apache.inlong.manager.pojo.source.SourcePageRequest"
resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 595b53576b..5f7958a3e1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -67,6 +67,7 @@ import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.module.ModuleDTO;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.node.AgentClusterNodeOperator;
@@ -229,8 +230,19 @@ public class AgentServiceImpl implements AgentService {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(factory);
executor.scheduleWithFixedDelay(() -> {
try {
+ List<StreamSourceEntity> needDeletedList =
sourceMapper.selectByByTimeout(retentionDays);
sourceMapper.logicalDeleteByTimeout(retentionDays);
- LOGGER.info("clean sub task successfully");
+ if (CollectionUtils.isNotEmpty(needDeletedList)) {
+ for (StreamSourceEntity sourceEntity :
needDeletedList) {
+ LOGGER.info("begin to clean sub task for
source={}", sourceEntity);
+ StreamSourceOperator sourceOperator =
+
operatorFactory.getInstance(sourceEntity.getSourceType());
+ SourceRequest request =
+
CommonBeanUtils.copyProperties(sourceEntity, SourceRequest::new, true);
+ sourceOperator.updateAgentTaskConfig(request,
sourceEntity.getModifier());
+ LOGGER.info("success to clean sub task
successfully, ={}", sourceEntity.getId());
+ }
+ }
} catch (Throwable t) {
LOGGER.error("clean sub task error", t);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index 502808602b..60db39e546 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -124,7 +124,8 @@ public class FileSourceOperator extends
AbstractSourceOperator {
StreamSourceEntity dataAddTaskEntity =
CommonBeanUtils.copyProperties(sourceEntity,
StreamSourceEntity::new);
dataAddTaskEntity.setId(null);
- dataAddTaskEntity.setSourceName(sourceEntity.getSourceName() + "-"
+ (dataAddTaskSize + 1));
+ dataAddTaskEntity.setSourceName(
+ sourceEntity.getSourceName() + "-" + (dataAddTaskSize + 1)
+ "-" + sourceEntity.getId());
dataAddTaskEntity.setExtParams(objectMapper.writeValueAsString(dto));
dataAddTaskEntity.setTaskMapId(sourceEntity.getId());
Integer id = sourceMapper.insert(dataAddTaskEntity);