This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5dea04d65 [INLONG-7912][Manager] Only response DataProxy nodes in
normal status (#7913)
5dea04d65 is described below
commit 5dea04d6526bd77dccaf28c5d4cce847c028f154
Author: fuweng11 <[email protected]>
AuthorDate: Fri May 5 21:25:23 2023 +0800
[INLONG-7912][Manager] Only response DataProxy nodes in normal status
(#7913)
---
.../service/cluster/InlongClusterServiceImpl.java | 6 +++++
.../cluster/node/AbstractClusterNodeOperator.java | 2 ++
.../service/cluster/InlongClusterServiceTest.java | 27 ++++++++++++++--------
3 files changed, 25 insertions(+), 10 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index be32bed28..d8fb62fbc 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -39,6 +39,7 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -1168,6 +1169,11 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
// TODO consider the data proxy load and re-balance
List<DataProxyNodeInfo> nodeList = new ArrayList<>();
for (InlongClusterNodeEntity nodeEntity : nodeEntities) {
+ if (Objects.equals(nodeEntity.getStatus(),
NodeStatus.HEARTBEAT_TIMEOUT.getStatus())) {
+ LOGGER.debug("dataproxy node was timeout, parentId={} ip={}
port={}", nodeEntity.getParentId(),
+ nodeEntity.getIp(), nodeEntity.getPort());
+ continue;
+ }
DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
nodeInfo.setId(nodeEntity.getId());
nodeInfo.setIp(nodeEntity.getIp());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
index b32c9ca75..85200c0bf 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.cluster.node;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
@@ -49,6 +50,7 @@ public abstract class AbstractClusterNodeOperator implements
InlongClusterNodeOp
entity.setCreator(operator);
entity.setModifier(operator);
+ entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus());
clusterNodeMapper.insert(entity);
return entity.getId();
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index c4dabbbee..36aa96e0d 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -174,12 +174,13 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
return clusterService.updateNode(request, GLOBAL_OPERATOR);
}
- private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip,
String port, String type) {
+ private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip,
String port, String type,
+ String protocolType) {
HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
heartbeatMsg.setIp(ip);
heartbeatMsg.setPort(port);
heartbeatMsg.setClusterTag("default_cluster");
- heartbeatMsg.setProtocolType(ProtocolType.HTTP);
+ heartbeatMsg.setProtocolType(protocolType);
heartbeatMsg.setLoad(0xFFFF);
heartbeatMsg.setComponentType(type);
heartbeatMsg.setReportTime(System.currentTimeMillis());
@@ -330,14 +331,6 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip,
port2, ProtocolType.TCP);
Assertions.assertNotNull(nodeId2);
- // report heartbeat
- HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port1),
- ComponentTypeEnum.DataProxy.getType());
- heartbeatManager.reportHeartbeat(msg1);
- HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port2),
- ComponentTypeEnum.DataProxy.getType());
- heartbeatManager.reportHeartbeat(msg2);
-
// create an inlong group which use the clusterTag
String inlongGroupId = "test_cluster_tag_group";
InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId,
MQType.PULSAR);
@@ -345,6 +338,13 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
updateGroupInfo.setInlongClusterTag(clusterTag);
groupService.update(updateGroupInfo.genRequest(), GLOBAL_OPERATOR);
+ // report heartbeat
+ HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port1),
+ ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
+ heartbeatManager.reportHeartbeat(msg1);
+ HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port2),
+ ComponentTypeEnum.DataProxy.getType(), ProtocolType.TCP);
+ heartbeatManager.reportHeartbeat(msg2);
// get the data proxy nodes, the first port should is p1, second port
is p2
DataProxyNodeResponse nodeResponse =
clusterService.getDataProxyNodes(inlongGroupId, ProtocolType.TCP);
List<DataProxyNodeInfo> nodeInfoList = nodeResponse.getNodeList();
@@ -353,6 +353,13 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
Assertions.assertEquals(port1, nodeInfoList.get(0).getPort());
Assertions.assertEquals(port2, nodeInfoList.get(1).getPort());
+ // report heartbeat
+ HeartbeatMsg msg3 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port1),
+ ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
+ heartbeatManager.reportHeartbeat(msg3);
+ HeartbeatMsg msg4 = createHeartbeatMsg(clusterName, ip,
String.valueOf(port2),
+ ComponentTypeEnum.DataProxy.getType(), ProtocolType.HTTP);
+ heartbeatManager.reportHeartbeat(msg4);
nodeResponse = clusterService.getDataProxyNodes(inlongGroupId,
ProtocolType.HTTP);
nodeInfoList = nodeResponse.getNodeList();
nodeInfoList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));