This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 97e12593e2 [INLONG-10233][Manager] Report heartbeat removal port
restriction (#10234)
97e12593e2 is described below
commit 97e12593e2b622b2f0ff59f7b98b1bf1d1f8c57f
Author: fuweng11 <[email protected]>
AuthorDate: Mon May 20 09:40:29 2024 +0800
[INLONG-10233][Manager] Report heartbeat removal port restriction (#10234)
---
.../mappers/InlongClusterNodeEntityMapper.xml | 7 ++++-
.../service/heartbeat/HeartbeatManager.java | 33 +++++++++++++++-------
2 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index e3d54df45a..d0016a28e3 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -95,7 +95,12 @@
and parent_id = #{parentId,jdbcType=INTEGER}
and type = #{type, jdbcType=VARCHAR}
and ip = #{ip, jdbcType=VARCHAR}
- and port = #{port, jdbcType=INTEGER}
+ <if test="port != null and port != ''">
+ and port = #{port, jdbcType=INTEGER}
+ </if>
+ <if test="port == null or port == ''">
+ and port is null
+ </if>
<if test="protocolType != null and protocolType != ''">
and protocol_type = #{protocolType, jdbcType=VARCHAR}
</if>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 53466f13cc..fd64f17a64 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -165,30 +165,39 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
}
// protocolType may be null, and the protocolTypes' length may be less
than ports' length
- String[] ports = heartbeat.getPort().split(InlongConstants.COMMA);
String[] ips = heartbeat.getIp().split(InlongConstants.COMMA);
+ String port = heartbeat.getPort();
+ String[] ports = null;
+ if (StringUtils.isNotBlank(port)) {
+ ports = port.split(InlongConstants.COMMA);
+ if (ports.length < ips.length) {
+ ports = null;
+ }
+ }
String[] reportSourceTypes = null;
- if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) &&
ports.length > 1) {
+ if (StringUtils.isNotBlank(heartbeat.getReportSourceType()) &&
ips.length > 1) {
reportSourceTypes =
heartbeat.getReportSourceType().split(InlongConstants.COMMA);
- if (reportSourceTypes.length < ports.length) {
+ if (reportSourceTypes.length < ips.length) {
reportSourceTypes = null;
}
}
String protocolType = heartbeat.getProtocolType();
String[] protocolTypes = null;
- if (StringUtils.isNotBlank(protocolType) && ports.length > 1) {
+ if (StringUtils.isNotBlank(protocolType) && ips.length > 1) {
protocolTypes = protocolType.split(InlongConstants.COMMA);
- if (protocolTypes.length < ports.length) {
+ if (protocolTypes.length < ips.length) {
protocolTypes = null;
}
}
int handlerNum = 0;
- for (int i = 0; i < ports.length; i++) {
+ for (int i = 0; i < ips.length; i++) {
// deep clone the heartbeat
HeartbeatMsg heartbeatMsg =
JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeat), HeartbeatMsg.class);
assert heartbeatMsg != null;
- heartbeatMsg.setPort(ports[i].trim());
+ if (StringUtils.isNotBlank(port)) {
+ heartbeatMsg.setPort(ports[i].trim());
+ }
heartbeatMsg.setIp(ips[i].trim());
if (reportSourceTypes != null) {
heartbeatMsg.setReportSourceType(reportSourceTypes[i].trim());
@@ -228,7 +237,7 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
}
// if the heartbeat already exists, or does not exist but
insert/update success, then put it into the cache
- if (lastHeartbeat == null || handlerNum == ports.length) {
+ if (lastHeartbeat == null || handlerNum == ips.length) {
heartbeatCache.put(componentHeartbeat, heartbeat);
}
}
@@ -292,7 +301,9 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
nodeRequest.setParentId(clusterInfo.getId());
nodeRequest.setType(heartbeat.getComponentType());
nodeRequest.setIp(heartbeat.getIp());
- nodeRequest.setPort(Integer.valueOf(heartbeat.getPort()));
+ if (StringUtils.isNotBlank(heartbeat.getPort())) {
+ nodeRequest.setPort(Integer.valueOf(heartbeat.getPort()));
+ }
nodeRequest.setProtocolType(heartbeat.getProtocolType());
return clusterNodeMapper.selectByUniqueKey(nodeRequest);
}
@@ -302,7 +313,9 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
clusterNode.setParentId(clusterInfo.getId());
clusterNode.setType(heartbeat.getComponentType());
clusterNode.setIp(heartbeat.getIp());
- clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
+ if (StringUtils.isNotBlank(heartbeat.getPort())) {
+ clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
+ }
clusterNode.setProtocolType(heartbeat.getProtocolType());
clusterNode.setNodeLoad(heartbeat.getLoad());
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());