This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 955e4d5d7852284fd6e3f97b0926c05083a7a683 Author: Lucas <[email protected]> AuthorDate: Thu Oct 13 15:33:59 2022 +0800 [INLONG-6159][Manager] Fix heartbeat status update failed error (#6161) * Fix heartbeat status update * Add comments and make the code more readable Co-authored-by: healchow <[email protected]> --- .../service/core/heartbeat/HeartbeatManager.java | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java index 6027fa908..757ab5bea 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java @@ -46,11 +46,13 @@ import javax.annotation.PostConstruct; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -@Component @Slf4j +@Component public class HeartbeatManager implements AbstractHeartbeatManager { private static final String AUTO_REGISTERED = "auto registered"; + private static final int UPDATED_ONE_ROW = 1; // updated one row + private static final int UPDATE_ZERO_ROW = 0; // no field updated @Getter private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache; @@ -92,18 +94,27 @@ public class HeartbeatManager implements AbstractHeartbeatManager { componentHeartbeat.getComponentType()); return; } + + // if the heartbeat was not in the cache, insert or update the node by the heartbeat info HeartbeatMsg lastHeartbeat = heartbeatCache.getIfPresent(componentHeartbeat); + boolean exist = true; + int updateNum = UPDATE_ZERO_ROW; if (lastHeartbeat == null) { + exist = false; InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeat); if (clusterNode == null) { - insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator()); - log.info("insert node success"); + updateNum = insertClusterNode(clusterInfo, heartbeat, clusterInfo.getCreator()); + log.info("insert node result: {}", updateNum); } else { - updateClusterNode(clusterNode); - log.info("update node success"); + updateNum = updateClusterNode(clusterNode); + log.info("update node result: {}", updateNum); } } - heartbeatCache.put(componentHeartbeat, heartbeat); + + // if the heartbeat already exists, or does not exist but insert/update success, then put it into the cache + if (exist || updateNum == UPDATED_ONE_ROW) { + heartbeatCache.put(componentHeartbeat, heartbeat); + } } private void evictClusterNode(HeartbeatMsg heartbeat) { @@ -134,7 +145,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { return clusterNodeMapper.selectByUniqueKey(nodeRequest); } - private void insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) { + private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) { InlongClusterNodeEntity clusterNode = new InlongClusterNodeEntity(); clusterNode.setParentId(clusterInfo.getId()); clusterNode.setType(heartbeat.getComponentType()); @@ -144,12 +155,12 @@ public class HeartbeatManager implements AbstractHeartbeatManager { clusterNode.setCreator(creator); clusterNode.setModifier(creator); clusterNode.setDescription(AUTO_REGISTERED); - clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode); + return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode); } - private void updateClusterNode(InlongClusterNodeEntity clusterNode) { + private int updateClusterNode(InlongClusterNodeEntity clusterNode) { clusterNode.setStatus(ClusterStatus.NORMAL.getStatus()); - clusterNodeMapper.updateById(clusterNode); + return clusterNodeMapper.updateById(clusterNode); } private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {
