This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 6972020d1 [INLONG-5953][Manager] Filter DataProxy nodes by status when
necessary (#5954)
6972020d1 is described below
commit 6972020d1795b00e88ffe761c0cf3b83a33ef2c0
Author: woofyzhao <[email protected]>
AuthorDate: Wed Sep 21 11:32:13 2022 +0800
[INLONG-5953][Manager] Filter DataProxy nodes by status when necessary
(#5954)
---
.../mappers/InlongClusterNodeEntityMapper.xml | 5 ++++-
.../inlong/manager/pojo/node/DataNodeRequest.java | 2 +-
.../service/cluster/InlongClusterServiceImpl.java | 8 +++++--
.../service/cluster/InlongClusterServiceTest.java | 25 +++++++++++++++++++++-
4 files changed, 35 insertions(+), 5 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index bb4a23f10..c184eab02 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -95,9 +95,12 @@
<if test="type != null and type != ''">
and type = #{type, jdbcType=VARCHAR}
</if>
- <if test="parentId != null and parentId != ''">
+ <if test="parentId != null">
and parent_id = #{parentId, jdbcType=INTEGER}
</if>
+ <if test="status != null">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
<if test="keyword != null and keyword != ''">
and (
ip like CONCAT('%', #{keyword}, '%')
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 54504e2dc..4f520bf21 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -34,7 +34,7 @@ import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Data node request")
+@ApiModel("Data node request")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
public abstract class DataNodeRequest {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index d3a05b7ff..e175f8c00 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -649,11 +650,14 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
throw new BusinessException(msg);
}
- // if more than one data proxy cluster, currently takes first
// TODO consider the data proxy load and re-balance
List<DataProxyNodeInfo> nodeInfos = new ArrayList<>();
for (InlongClusterEntity entity : clusterList) {
- List<InlongClusterNodeEntity> nodeList =
clusterNodeMapper.selectByParentId(entity.getId());
+ ClusterPageRequest request = ClusterPageRequest.builder()
+ .parentId(entity.getId())
+ .status(NodeStatus.NORMAL.getStatus())
+ .build();
+ List<InlongClusterNodeEntity> nodeList =
clusterNodeMapper.selectByCondition(request);
for (InlongClusterNodeEntity nodeEntity : nodeList) {
DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
nodeInfo.setId(nodeEntity.getId());
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index a7249e866..23b2326ce 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.cluster;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.common.consts.MQType;
@@ -31,10 +33,12 @@ import
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Comparator;
import java.util.List;
/**
@@ -44,6 +48,8 @@ public class InlongClusterServiceTest extends ServiceBaseTest
{
@Autowired
private InlongClusterService clusterService;
+ @Autowired
+ private HeartbeatManager heartbeatManager;
/**
* Save data proxy cluster
@@ -139,6 +145,16 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
return clusterService.updateNode(request, GLOBAL_OPERATOR);
}
+ private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, int
port, String type) {
+ HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
+ heartbeatMsg.setIp(ip);
+ heartbeatMsg.setPort(port);
+ heartbeatMsg.setComponentType(type);
+ heartbeatMsg.setReportTime(System.currentTimeMillis());
+ heartbeatMsg.setClusterName(clusterName);
+ return heartbeatMsg;
+ }
+
/**
* Delete cluster node info.
*/
@@ -201,7 +217,7 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
}
@Test
- public void testGetDataProxyIp() {
+ public void testGetDataProxyIp() throws InterruptedException {
String clusterTag = "default_cluster";
String clusterName = "test_data_proxy";
String extTag = "ext_1";
@@ -220,6 +236,12 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip,
port2);
Assertions.assertNotNull(nodeId2);
+ // report heartbeat
+ HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, port1,
ComponentTypeEnum.DataProxy.getName());
+ heartbeatManager.reportHeartbeat(msg1);
+ HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, port2,
ComponentTypeEnum.DataProxy.getName());
+ heartbeatManager.reportHeartbeat(msg2);
+
// create an inlong group which use the clusterTag
String inlongGroupId = "test_cluster_tag_group";
InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId,
MQType.PULSAR);
@@ -230,6 +252,7 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
// get the data proxy nodes, the first port should is p1, second port
is p2
DataProxyNodeResponse nodeResponse =
clusterService.getDataProxyNodes(inlongGroupId);
List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList();
+ ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
Assertions.assertEquals(ipList.size(), 2);
Assertions.assertEquals(port1, ipList.get(0).getPort());
Assertions.assertEquals(port2, ipList.get(1).getPort());