This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 72006de191 [INLONG-8539][Manager] Logically remove all sources when
agent heartbeat contains no group message (#8540)
72006de191 is described below
commit 72006de19148479600e5178670cef752ee79b285
Author: kipshi <[email protected]>
AuthorDate: Mon Jul 17 16:32:58 2023 +0800
[INLONG-8539][Manager] Logically remove all sources when agent heartbeat
contains no group message (#8540)
---
.../dao/mapper/StreamSourceEntityMapper.java | 11 ++++++++
.../resources/mappers/StreamSourceEntityMapper.xml | 14 ++++++++++
.../service/heartbeat/HeartbeatServiceImpl.java | 24 ++++++++++++++++-
.../service/core/impl/AgentServiceTest.java | 30 +++-------------------
4 files changed, 52 insertions(+), 27 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 7e3594369e..805c73206a 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -193,6 +193,17 @@ public interface StreamSourceEntityMapper {
*/
void updateStatusByDeleted();
+ /**
+ * Logical delete stream source by agentIp, change status at same time.
+ *
+ * @param agentIp ip of agent cluster node
+ * @param status status to change
+ * @param targetStatus status of stream source now
+ *
+ */
+ void logicalDeleteByAgentIp(@Param("agentIp") String agentIp,
@Param("status") Integer status,
+ @Param("targetStatus") Integer targetStatus);
+
/**
* Physical delete stream sources by group id and stream id
*/
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 43d8045e03..700d740f2c 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -563,6 +563,20 @@
and status not in (99, 201, 301)
</where>
</update>
+ <update id="logicalDeleteByAgentIp">
+ update stream_source
+ <set>
+ is_deleted = id,
+ previous_status = status,
+ status = #{status, jdbcType=INTEGER},
+ version = version + 1
+ </set>
+ where is_deleted = 0
+ and agent_ip = #{agentIp, jdbcType=VARCHAR}
+ <if test="targetStatus != null">
+ and status = #{targetStatus, jdbcType=INTEGER}
+ </if>
+ </update>
<delete id="deleteByRelatedId">
delete
from stream_source
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index d6f042f028..5b2cd5e46d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.heartbeat.GroupHeartbeat;
import org.apache.inlong.common.heartbeat.StreamHeartbeat;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -30,6 +31,7 @@ import
org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.heartbeat.ComponentHeartbeatResponse;
import org.apache.inlong.manager.pojo.heartbeat.GroupHeartbeatResponse;
@@ -64,6 +66,8 @@ public class HeartbeatServiceImpl implements HeartbeatService
{
@Lazy
private HeartbeatManager heartbeatManager;
@Autowired
+ private StreamSourceEntityMapper sourceMapper;
+ @Autowired
private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
@Autowired
private GroupHeartbeatEntityMapper groupHeartbeatMapper;
@@ -82,9 +86,10 @@ public class HeartbeatServiceImpl implements
HeartbeatService {
heartbeatManager.reportHeartbeat(request);
ComponentTypeEnum componentType =
ComponentTypeEnum.forType(request.getComponentType());
switch (componentType) {
+ case Agent:
+ return updateAgentHeartbeatOpt(request);
case Sort:
case DataProxy:
- case Agent:
case Cache:
case SDK:
return updateHeartbeatOpt(request);
@@ -220,6 +225,23 @@ public class HeartbeatServiceImpl implements
HeartbeatService {
}
}
+ /**
+ * Update heartbeatMsg for agent , if groupMsg is empty, then logically
remove all stream source related.
+ * If type of stream_source is file, change status from heartbeat_timeout
+ *
+ * @param request
+ * @return
+ */
+ private Boolean updateAgentHeartbeatOpt(HeartbeatReportRequest request) {
+ // If heartbeatMsg not contain any group ,just delete
+ if (CollectionUtils.isEmpty(request.getGroupHeartbeats()) &&
StringUtils.isNotBlank(request.getIp())) {
+ String agentIp = request.getIp();
+ sourceMapper.logicalDeleteByAgentIp(agentIp,
SourceStatus.SOURCE_DISABLE.getCode(),
+ SourceStatus.SOURCE_NORMAL.getCode());
+ }
+ return updateHeartbeatOpt(request);
+ }
+
/**
* Default implementation for updating heartbeat
*/
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index bfdbffba8c..6ac8682346 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -236,38 +236,16 @@ class AgentServiceTest extends ServiceBaseTest {
// unbind group and mismatch
bindGroup(false, "group1");
TaskResult t1 = agent.pullTask();
- Assertions.assertEquals(1, t1.getDataConfigs().size());
- Assertions.assertEquals(1, t1.getDataConfigs().stream()
- .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) ==
ManagerOpEnum.FROZEN.getType())
- .collect(Collectors.toSet())
- .size());
- DataConfig d1 = t1.getDataConfigs().get(0);
- Assertions.assertEquals(sourceId, d1.getTaskId());
+ Assertions.assertEquals(0, t1.getDataConfigs().size());
// bind group and rematch
bindGroup(true, "group1");
TaskResult t2 = agent.pullTask();
- Assertions.assertEquals(0, t2.getDataConfigs().size());
- Assertions.assertEquals(0, t2.getDataConfigs().stream()
- .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) ==
ManagerOpEnum.ACTIVE.getType())
- .collect(Collectors.toSet())
- .size());
-
- // update group to config success
- final String groupId = sourceService.listSource(groupStream.getLeft(),
groupStream.getRight()).stream()
- .filter(source -> source.getTemplateId() != null)
- .findAny()
- .get()
- .getInlongGroupId();
- groupMapper.updateStatus(groupId,
GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
- TaskResult t3 = agent.pullTask();
- Assertions.assertEquals(1, t3.getDataConfigs().size());
- Assertions.assertEquals(1, t3.getDataConfigs().stream()
- .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) ==
ManagerOpEnum.ACTIVE.getType())
+ Assertions.assertEquals(1, t2.getDataConfigs().size());
+ Assertions.assertEquals(1, t2.getDataConfigs().stream()
+ .filter(dataConfig -> Integer.valueOf(dataConfig.getOp()) ==
ManagerOpEnum.ADD.getType())
.collect(Collectors.toSet())
.size());
- DataConfig d3 = t3.getDataConfigs().get(0);
- Assertions.assertEquals(sourceId, d3.getTaskId());
}
/**