This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 10e512ff0 [INLONG-7199][Manager] Support save extension params for
inlong cluster node (#7200)
10e512ff0 is described below
commit 10e512ff0b87a1f7aa49c187bc3f8a656476efad
Author: fuweng11 <[email protected]>
AuthorDate: Tue Jan 10 20:43:41 2023 +0800
[INLONG-7199][Manager] Support save extension params for inlong cluster
node (#7200)
---
.../manager/common/consts/AgentConstants.java | 26 -------
.../dao/mapper/InlongClusterNodeEntityMapper.java | 2 +
.../mappers/InlongClusterNodeEntityMapper.xml | 41 ++++++++++
.../manager/pojo/cluster/ClusterNodeRequest.java | 3 +-
.../pojo/cluster/agent/AgentClusterNodeDTO.java | 64 +++++++++++++++
.../cluster/agent/AgentClusterNodeRequest.java | 46 +++++++++++
.../cluster/agent/AgentClusterNodeResponse.java | 46 +++++++++++
.../service/cluster/InlongClusterServiceImpl.java | 45 ++++-------
.../cluster/node/AbstractClusterNodeOperator.java | 81 +++++++++++++++++++
.../cluster/node/AgentClusterNodeOperator.java | 90 ++++++++++++++++++++++
.../cluster/node/DefaultClusterNodeOperator.java | 60 +++++++++++++++
.../cluster/node/InlongClusterNodeOperator.java | 62 +++++++++++++++
.../node/InlongClusterNodeOperatorFactory.java | 52 +++++++++++++
.../service/core/impl/AgentServiceImpl.java | 70 +++++++++--------
.../service/heartbeat/HeartbeatManager.java | 23 +++---
15 files changed, 613 insertions(+), 98 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
deleted file mode 100644
index 85da5a876..000000000
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.consts;
-
-/**
- * Constant class for agent ext params
- */
-public class AgentConstants {
-
- public static final String AGENT_GROUP_KEY = "agentGroup";
-}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
index dabc837dd..83d2108a3 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java
@@ -43,6 +43,8 @@ public interface InlongClusterNodeEntityMapper {
int updateById(InlongClusterNodeEntity record);
+ int updateByIdSelective(InlongClusterNodeEntity record);
+
int deleteById(Integer id);
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 6465d3c91..b65afebc5 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -150,6 +150,47 @@
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>
+ <update id="updateByIdSelective"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
+ update inlong_cluster_node
+ <set>
+ <if test="parentId != null">
+ parent_id = #{parentId,jdbcType=INTEGER},
+ </if>
+ <if test="type != null">
+ type = #{type,jdbcType=VARCHAR},
+ </if>
+ <if test="ip != null">
+ ip = #{ip,jdbcType=VARCHAR},
+ </if>
+ <if test="port != null">
+ port = #{port,jdbcType=INTEGER},
+ </if>
+ <if test="protocolType != null">
+ protocol_type = #{protocolType,jdbcType=VARCHAR},
+ </if>
+ <if test="nodeLoad != null">
+ node_load = #{nodeLoad,jdbcType=INTEGER},
+ </if>
+ <if test="extParams != null">
+ ext_params = #{extParams,jdbcType=LONGVARCHAR},
+ </if>
+ <if test="description != null">
+ description = #{description,jdbcType=VARCHAR},
+ </if>
+ <if test="status != null">
+ status = #{status,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ version = #{version,jdbcType=INTEGER} + 1
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ and version = #{version,jdbcType=INTEGER}
+ </update>
<delete id="deleteById" parameterType="java.lang.Integer">
delete
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 5e789100b..80fe962f8 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.pojo.cluster;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@@ -31,6 +32,7 @@ import javax.validation.constraints.NotNull;
*/
@Data
@ApiModel("Cluster node request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type",
defaultImpl = ClusterNodeRequest.class)
public class ClusterNodeRequest {
@NotNull(groups = UpdateValidation.class)
@@ -53,7 +55,6 @@ public class ClusterNodeRequest {
@NotNull(message = "port cannot be null")
@ApiModelProperty(value = "Cluster port")
- @Length(max = 6, message = "length must be less than or equal to 6")
private Integer port;
@NotBlank(message = "protocolType cannot be blank")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
new file mode 100644
index 000000000..1d6e4a6a4
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeDTO.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Agent cluster node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Agent cluster node info")
+public class AgentClusterNodeDTO {
+
+ @ApiModelProperty(value = "Agent group name")
+ private String agentGroup;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static AgentClusterNodeDTO getFromRequest(AgentClusterNodeRequest
request) {
+ return CommonBeanUtils.copyProperties(request,
AgentClusterNodeDTO::new, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static AgentClusterNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, AgentClusterNodeDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
new file mode 100644
index 000000000..1cc3b4242
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+
+/**
+ * Inlong cluster node request for Agent
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.AGENT)
+@ApiModel("Inlong cluster node request for Agent")
+public class AgentClusterNodeRequest extends ClusterNodeRequest {
+
+ @ApiModelProperty(value = "Agent group name")
+ private String agentGroup;
+
+ public AgentClusterNodeRequest() {
+ this.setType(ClusterType.AGENT);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
new file mode 100644
index 000000000..af43ae221
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/agent/AgentClusterNodeResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.agent;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+
+/**
+ * Agent cluster response
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.AGENT)
+@ApiModel("Inlong cluster node response for Agent")
+public class AgentClusterNodeResponse extends ClusterNodeResponse {
+
+ @ApiModelProperty(value = "Agent group name")
+ private String agentGroup;
+
+ public AgentClusterNodeResponse() {
+ this.setType(ClusterType.AGENT);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 6ddd96575..9ece6b2b0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -69,6 +69,8 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
+import
org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperator;
+import
org.apache.inlong.manager.service.cluster.node.InlongClusterNodeOperatorFactory;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +108,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
@Autowired
private InlongClusterOperatorFactory clusterOperatorFactory;
@Autowired
+ private InlongClusterNodeOperatorFactory clusterNodeOperatorFactory;
+ @Autowired
private InlongClusterTagEntityMapper clusterTagMapper;
@Autowired
private InlongClusterEntityMapper clusterMapper;
@@ -985,14 +989,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
-
- InlongClusterNodeEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
- entity.setCreator(operator);
- entity.setModifier(operator);
- clusterNodeMapper.insert(entity);
-
- LOGGER.info("success to add inlong cluster node={}", request);
- return entity.getId();
+ InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
+ return instance.saveOpt(request, operator);
}
@Override
@@ -1027,11 +1025,9 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
request.getType(), request.getIp(),
request.getPort()));
}
// add record
- InlongClusterNodeEntity clusterNode =
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
- clusterNode.setCreator(opInfo.getName());
- clusterNode.setModifier(opInfo.getName());
- clusterNodeMapper.insert(clusterNode);
- return entity.getId();
+ InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
+ instance.saveOpt(request, opInfo.getName());
+ return instance.saveOpt(request, opInfo.getName());
}
@Override
@@ -1045,9 +1041,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
InlongClusterEntity cluster =
clusterMapper.selectById(entity.getParentId());
String message = "Current user does not have permission to get cluster
node";
checkUser(cluster, currentUser, message);
- ClusterNodeResponse clusterNodeResponse =
CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new);
- LOGGER.debug("success to get inlong cluster node by id={}", id);
- return clusterNodeResponse;
+ InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(entity.getType());
+ return instance.getFromEntity(entity);
}
@Override
@@ -1267,14 +1262,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
String message = "Current user does not have permission to update
cluster node";
checkUser(cluster, operator, message);
- CommonBeanUtils.copyProperties(request, entity, true);
- entity.setParentId(request.getParentId());
- entity.setModifier(operator);
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterNodeMapper.updateById(entity)) {
- LOGGER.warn(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- LOGGER.info("success to update inlong cluster node={}", request);
+ InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
+ instance.updateOpt(request, operator);
return true;
}
@@ -1322,12 +1311,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
"inlong cluster node already exist for " + request);
}
// update record
- CommonBeanUtils.copyProperties(request, entity, true);
- entity.setParentId(request.getParentId());
- entity.setModifier(opInfo.getName());
- if (InlongConstants.AFFECTED_ONE_ROW !=
clusterNodeMapper.updateById(entity)) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ InlongClusterNodeOperator instance =
clusterNodeOperatorFactory.getInstance(request.getType());
+ instance.updateOpt(request, opInfo.getName());
return true;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
new file mode 100644
index 000000000..9f971bbd5
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AbstractClusterNodeOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operator of inlong cluster node.
+ */
+public abstract class AbstractClusterNodeOperator implements
InlongClusterNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractClusterNodeOperator.class);
+
+ @Autowired
+ protected InlongClusterNodeEntityMapper clusterNodeMapper;
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer saveOpt(ClusterNodeRequest request, String operator) {
+ InlongClusterNodeEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ clusterNodeMapper.insert(entity);
+
+ return entity.getId();
+ }
+
+ /**
+ * Set the parameters of the target entity.
+ *
+ * @param request inlong cluster request
+ * @param targetEntity entity which will set the new parameters
+ */
+ protected abstract void setTargetEntity(ClusterNodeRequest request,
InlongClusterNodeEntity targetEntity);
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
+ public void updateOpt(ClusterNodeRequest request, String operator) {
+ InlongClusterNodeEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+ entity.setModifier(operator);
+ if (InlongConstants.AFFECTED_ONE_ROW !=
clusterNodeMapper.updateByIdSelective(entity)) {
+ String errMsg = String.format(
+ "cluster node has already updated with ip=%s, port=%s,
protocolType=%s, type=%s, curVersion=%s",
+ entity.getIp(), entity.getPort(),
entity.getProtocolType(), entity.getType(), entity.getVersion());
+ LOGGER.warn(errMsg);
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED, errMsg);
+ }
+ LOGGER.info("success to update inlong cluster node={}", request);
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
new file mode 100644
index 000000000..38b0ce42e
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Agent cluster node operator.
+ */
+@Slf4j
+@Service
+public class AgentClusterNodeOperator extends AbstractClusterNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AgentClusterNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String clusterNodeType) {
+ return getClusterNodeType().equals(clusterNodeType);
+ }
+
+ @Override
+ public String getClusterNodeType() {
+ return ClusterType.AGENT;
+ }
+
+ @Override
+ public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+
+ AgentClusterNodeResponse agentClusterNodeResponse = new
AgentClusterNodeResponse();
+ CommonBeanUtils.copyProperties(entity, agentClusterNodeResponse);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ AgentClusterNodeDTO dto =
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, agentClusterNodeResponse);
+ }
+
+ LOGGER.debug("success to get agent cluster node info from entity");
+ return agentClusterNodeResponse;
+ }
+
+ @Override
+ protected void setTargetEntity(ClusterNodeRequest request,
InlongClusterNodeEntity targetEntity) {
+ AgentClusterNodeRequest agentNodeRequest = (AgentClusterNodeRequest)
request;
+ CommonBeanUtils.copyProperties(agentNodeRequest, targetEntity, true);
+ try {
+ AgentClusterNodeDTO dto =
AgentClusterNodeDTO.getFromRequest(agentNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ LOGGER.debug("success to set entity for agent cluster node");
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+ ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
new file mode 100644
index 000000000..a684f4b86
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/DefaultClusterNodeOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Default cluster node operator.
+ */
+@Slf4j
+@Service
+public class DefaultClusterNodeOperator extends AbstractClusterNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultClusterNodeOperator.class);
+ public static final String DEFAULT = "DEFAULT";
+
+ @Override
+ public Boolean accept(String clusterNodeType) {
+ return getClusterNodeType().equals(clusterNodeType);
+ }
+
+ @Override
+ public String getClusterNodeType() {
+ return DEFAULT;
+ }
+
+ @Override
+ public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) {
+ ClusterNodeResponse clusterNodeResponse =
CommonBeanUtils.copyProperties(entity, ClusterNodeResponse::new);
+ LOGGER.debug("success to get inlong cluster node by id={}",
entity.getId());
+ return clusterNodeResponse;
+ }
+
+ @Override
+ protected void setTargetEntity(ClusterNodeRequest request,
InlongClusterNodeEntity targetEntity) {
+ LOGGER.info("do nothing for default cluster node in set target
entity");
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
new file mode 100644
index 000000000..075f8fcb5
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
+
+public interface InlongClusterNodeOperator {
+
+ /**
+ * Determines whether the current instance matches the specified type.
+ */
+ Boolean accept(String clusterType);
+
+ /**
+ * Get the cluster node type.
+ *
+ * @return cluster node type string
+ */
+ String getClusterNodeType();
+
+ /**
+ * Save the inlong cluster node info.
+ *
+ * @param request request of the cluster node
+ * @param operator name of the operator
+ * @return cluster node id after saving
+ */
+ Integer saveOpt(ClusterNodeRequest request, String operator);
+
+ /**
+ * Get the cluster node info from the given entity.
+ *
+ * @param entity get field value from the entity
+ * @return cluster info after encapsulating
+ */
+ ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity);
+
+ /**
+ * Update the inlong cluster node info.
+ *
+ * @param request request of update
+ * @param operator name of operator
+ */
+ void updateOpt(ClusterNodeRequest request, String operator);
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
new file mode 100644
index 000000000..d7b6873e2
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/InlongClusterNodeOperatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InlongClusterNodeOperator}.
+ */
+@Service
+public class InlongClusterNodeOperatorFactory {
+
+ @Autowired
+ private List<InlongClusterNodeOperator> clusterNodeOperatorList;
+
+ public static final String DEFAULT = "DEFAULT";
+
+ /**
+ * Get a cluster node operator instance via the given type
+ */
+ public InlongClusterNodeOperator getInstance(String type) {
+ return clusterNodeOperatorList.stream()
+ .filter(inst -> inst.accept(type))
+ .findFirst()
+ .orElseGet(() -> clusterNodeOperatorList.stream()
+ .filter(inst -> inst.accept(DEFAULT))
+ .findFirst()
+ .orElseThrow(
+ () -> new
BusinessException(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(), type))));
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 13918600c..035bcc993 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.core.impl;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
@@ -35,7 +36,6 @@ import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
-import org.apache.inlong.manager.common.consts.AgentConstants;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -56,8 +56,9 @@ import
org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
+import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
@@ -79,7 +80,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -221,16 +221,19 @@ public class AgentServiceImpl implements AgentService {
pageRequest.setType(ClusterType.AGENT);
pageRequest.setKeyword(clusterNode);
return
clusterNodeMapper.selectByCondition(pageRequest).stream();
- }).filter(entity -> entity != null)
+ }).filter(Objects::nonNull)
.forEach(entity -> {
- Map<String, String> extParams = entity.getExtParams()
== null ? new HashMap<>()
- : GSON.fromJson(entity.getExtParams(),
Map.class);
- Set<String> groupSet =
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
- : Sets.newHashSet(
-
extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+ Set<String> groupSet = new HashSet<>();
+ AgentClusterNodeDTO agentClusterNodeDTO = new
AgentClusterNodeDTO();
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ agentClusterNodeDTO =
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+ String agentGroup =
agentClusterNodeDTO.getAgentGroup();
+ groupSet = StringUtils.isBlank(agentGroup) ?
groupSet
+ :
Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+ }
groupSet.add(request.getAgentGroup());
- extParams.put(AgentConstants.AGENT_GROUP_KEY,
String.join(InlongConstants.COMMA, groupSet));
- entity.setExtParams(GSON.toJson(extParams));
+
agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet));
+ entity.setExtParams(GSON.toJson(agentClusterNodeDTO));
clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
});
}
@@ -242,16 +245,19 @@ public class AgentServiceImpl implements AgentService {
pageRequest.setType(ClusterType.AGENT);
pageRequest.setKeyword(clusterNode);
return
clusterNodeMapper.selectByCondition(pageRequest).stream();
- }).filter(entity -> entity != null)
+ }).filter(Objects::nonNull)
.forEach(entity -> {
- Map<String, String> extParams = entity.getExtParams()
== null ? new HashMap<>()
- : GSON.fromJson(entity.getExtParams(),
Map.class);
- Set<String> groupSet =
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
- : Sets.newHashSet(
-
extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
+ Set<String> groupSet = new HashSet<>();
+ AgentClusterNodeDTO agentClusterNodeDTO = new
AgentClusterNodeDTO();
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ agentClusterNodeDTO =
AgentClusterNodeDTO.getFromJson(entity.getExtParams());
+ String agentGroup =
agentClusterNodeDTO.getAgentGroup();
+ groupSet = StringUtils.isBlank(agentGroup) ?
groupSet
+ :
Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+ }
groupSet.remove(request.getAgentGroup());
- extParams.put(AgentConstants.AGENT_GROUP_KEY,
String.join(InlongConstants.COMMA, groupSet));
- entity.setExtParams(GSON.toJson(extParams));
+
agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(groupSet));
+ entity.setExtParams(GSON.toJson(agentClusterNodeDTO));
clusterNodeMapper.insertOnDuplicateKeyUpdate(entity);
});
}
@@ -361,9 +367,9 @@ public class AgentServiceImpl implements AgentService {
/**
* Find file collecting task match those condition:
- * 1.agent ip match
- * 2.cluster name match
- * Send the corresponding task action request according to the matching
state of the tag and the current state
+ * 1.agent ip match
+ * 2.cluster name match
+ * Send the corresponding task action request according to the matching
state of the tag and the current state
*/
private void preProcessLabelFileTasks(TaskRequest taskRequest) {
List<Integer> needProcessedStatusList = Arrays.asList(
@@ -387,7 +393,7 @@ public class AgentServiceImpl implements AgentService {
Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
SourceStatus.SOURCE_FROZEN,
SourceStatus.TO_BE_ISSUED_FROZEN);
- if (!matchLabel(sourceEntity, clusterNodeEntity)
+ if (!matchGroup(sourceEntity, clusterNodeEntity)
&&
!exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus())))
{
LOGGER.info("Transform task({}) from {} to {} because tag
mismatch "
+ "for agent({}) in cluster({})",
sourceEntity.getAgentIp(),
@@ -406,7 +412,7 @@ public class AgentServiceImpl implements AgentService {
SourceStatus.TO_BE_ISSUED_ACTIVE);
Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
- if (matchLabel(sourceEntity, clusterNodeEntity)
+ if (matchGroup(sourceEntity, clusterNodeEntity)
&&
!exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
&&
!exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus())))
{
LOGGER.info("Transform task({}) from {} to {} because tag
rematch "
@@ -575,7 +581,7 @@ public class AgentServiceImpl implements AgentService {
}).collect(Collectors.toList());
}
- private boolean matchLabel(StreamSourceEntity sourceEntity,
InlongClusterNodeEntity clusterNodeEntity) {
+ private boolean matchGroup(StreamSourceEntity sourceEntity,
InlongClusterNodeEntity clusterNodeEntity) {
Preconditions.checkNotNull(sourceEntity, "cluster must be valid");
if (sourceEntity.getInlongClusterNodeGroup() == null) {
return true;
@@ -585,12 +591,16 @@ public class AgentServiceImpl implements AgentService {
return false;
}
- Map<String, String> extParams =
GSON.fromJson(clusterNodeEntity.getExtParams(), Map.class);
- Set<String> clusterNodeLabels =
!extParams.containsKey(AgentConstants.AGENT_GROUP_KEY) ? new HashSet<>()
- :
Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
- Set<String> sourceLabels = Stream.of(
+ Set<String> clusterNodeGroups = new HashSet<>();
+ if (StringUtils.isNotBlank(clusterNodeEntity.getExtParams())) {
+ AgentClusterNodeDTO agentClusterNodeDTO =
AgentClusterNodeDTO.getFromJson(clusterNodeEntity.getExtParams());
+ String agentGroup = agentClusterNodeDTO.getAgentGroup();
+ clusterNodeGroups = StringUtils.isBlank(agentGroup) ? new
HashSet<>()
+ : Sets.newHashSet(agentGroup.split(InlongConstants.COMMA));
+ }
+ Set<String> sourceGroups = Stream.of(
sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
- return sourceLabels.stream().anyMatch(clusterNodeLabels::contains);
+ return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 2fb8d5608..4232e4392 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -22,6 +22,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Joiner;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
@@ -31,7 +32,6 @@ import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
-import org.apache.inlong.manager.common.consts.AgentConstants;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.NodeStatus;
@@ -43,6 +43,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
+import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -50,9 +51,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -224,24 +223,26 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
clusterNode.setCreator(creator);
clusterNode.setModifier(creator);
clusterNode.setDescription(AUTO_REGISTERED);
- insertOrUpdateLabel(clusterNode, heartbeat);
+ insertOrUpdateNodeGroup(clusterNode, heartbeat);
return clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
}
private int updateClusterNode(InlongClusterNodeEntity clusterNode,
HeartbeatMsg heartbeat) {
clusterNode.setStatus(ClusterStatus.NORMAL.getStatus());
clusterNode.setNodeLoad(heartbeat.getLoad());
- insertOrUpdateLabel(clusterNode, heartbeat);
+ insertOrUpdateNodeGroup(clusterNode, heartbeat);
return clusterNodeMapper.updateById(clusterNode);
}
- private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode,
HeartbeatMsg heartbeat) {
- Set<String> groupSet = heartbeat.getNodeGroup() == null ? new
HashSet<>()
+ private void insertOrUpdateNodeGroup(InlongClusterNodeEntity clusterNode,
HeartbeatMsg heartbeat) {
+ Set<String> groupSet = StringUtils.isBlank(heartbeat.getNodeGroup()) ?
new HashSet<>()
:
Arrays.stream(heartbeat.getNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
- Map<String, String> extParams = clusterNode.getExtParams() == null ?
new HashMap<>()
- : GSON.fromJson(clusterNode.getExtParams(), Map.class);
- extParams.put(AgentConstants.AGENT_GROUP_KEY,
String.join(InlongConstants.COMMA, groupSet));
- clusterNode.setExtParams(GSON.toJson(extParams));
+ AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO();
+ if (StringUtils.isNotBlank(clusterNode.getExtParams())) {
+ agentClusterNodeDTO =
AgentClusterNodeDTO.getFromJson(clusterNode.getExtParams());
+
agentClusterNodeDTO.setAgentGroup(Joiner.on(InlongConstants.COMMA).join(groupSet));
+ }
+ clusterNode.setExtParams(GSON.toJson(agentClusterNodeDTO));
}
private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {