This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0c02cfdee8 [INLONG-10635][Manager] Optimize the installer
configuration process (#10637)
0c02cfdee8 is described below
commit 0c02cfdee8476a8123b10903863574c46d0556bb
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jul 17 11:40:28 2024 +0800
[INLONG-10635][Manager] Optimize the installer configuration process
(#10637)
---
.../common/pojo/agent/installer/ConfigResult.java | 2 +
.../common/pojo/agent/installer/ModuleConfig.java | 4 +
.../manager/dao/entity/AgentTaskConfigEntity.java | 1 +
.../dao/mapper/ModuleConfigEntityMapper.java | 8 +
.../dao/mapper/PackageConfigEntityMapper.java | 8 +
.../mappers/AgentTaskConfigEntityMapper.xml | 12 +-
.../main/resources/mappers/ModuleConfigMapper.xml | 8 +
.../main/resources/mappers/PackageConfigMapper.xml | 8 +
.../cluster/node/AgentClusterNodeOperator.java | 168 +++++++++++++++-
.../inlong/manager/service/core/ConfigLoader.java | 16 ++
.../service/core/impl/AgentServiceImpl.java | 218 ++++++++++++---------
.../service/core/impl/ConfigLoaderImpl.java | 27 +++
.../main/resources/h2/apache_inlong_manager.sql | 3 +-
.../manager-web/sql/apache_inlong_manager.sql | 3 +-
inlong-manager/manager-web/sql/changes-1.13.0.sql | 3 +-
.../src/main/resources/application-dev.properties | 3 +-
16 files changed, 389 insertions(+), 103 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
index b2c5426a81..781b0eb090 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ConfigResult.java
@@ -45,6 +45,8 @@ public class ConfigResult {
*/
private String md5;
+ private Integer version;
+
/**
* The list of module config list
*/
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ModuleConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ModuleConfig.java
index c76e42522d..517903b2dc 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ModuleConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/installer/ModuleConfig.java
@@ -32,6 +32,10 @@ import lombok.NoArgsConstructor;
public class ModuleConfig {
private Integer id;
+ /**
+ * The primary key ID of the manager module config table
+ */
+ private Integer entityId;
private String name;
/**
* The md5 of the module config
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
index cc8c3ab529..ed539e370b 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/AgentTaskConfigEntity.java
@@ -36,6 +36,7 @@ public class AgentTaskConfigEntity implements Serializable {
private String configParams;
private String taskParams;
+ private String moduleParams;
private Integer isDeleted;
private String creator;
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ModuleConfigEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ModuleConfigEntityMapper.java
index cec1881377..253fe297ff 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ModuleConfigEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ModuleConfigEntityMapper.java
@@ -17,10 +17,14 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
import org.apache.inlong.manager.pojo.module.ModulePageRequest;
+import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
import org.springframework.stereotype.Repository;
import java.util.List;
@@ -36,4 +40,8 @@ public interface ModuleConfigEntityMapper {
List<ModuleConfigEntity> selectByCondition(@Param("request")
ModulePageRequest request);
+ @MultiTenantQuery(with = false)
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize =
Integer.MIN_VALUE)
+ Cursor<ModuleConfigEntity> selectAllModuleConfigs();
+
}
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/PackageConfigEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/PackageConfigEntityMapper.java
index 29ab4e675e..cf52b08b2c 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/PackageConfigEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/PackageConfigEntityMapper.java
@@ -17,10 +17,14 @@
package org.apache.inlong.manager.dao.mapper;
+import org.apache.inlong.manager.common.tenant.MultiTenantQuery;
import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
import org.apache.inlong.manager.pojo.module.PackagePageRequest;
+import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.cursor.Cursor;
+import org.apache.ibatis.mapping.ResultSetType;
import org.springframework.stereotype.Repository;
import java.util.List;
@@ -36,4 +40,8 @@ public interface PackageConfigEntityMapper {
List<PackageConfigEntity> selectByCondition(@Param("request")
PackagePageRequest request);
+ @MultiTenantQuery(with = false)
+ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize =
Integer.MIN_VALUE)
+ Cursor<PackageConfigEntity> selectAllPackageConfigs();
+
}
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
index 9a5f237c8d..0624ec4af6 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AgentTaskConfigEntityMapper.xml
@@ -26,6 +26,7 @@
<result column="cluster_name" jdbcType="VARCHAR"
property="clusterName"/>
<result column="config_params" jdbcType="VARCHAR"
property="configParams"/>
<result column="task_params" jdbcType="VARCHAR" property="taskParams"/>
+ <result column="module_params" jdbcType="VARCHAR"
property="moduleParams"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
<result column="creator" jdbcType="VARCHAR" property="creator"/>
<result column="modifier" jdbcType="VARCHAR" property="modifier"/>
@@ -35,15 +36,15 @@
</resultMap>
<sql id="Base_Column_List">
- id, agent_ip, cluster_name, config_params, task_params, is_deleted,
creator, modifier, create_time, modify_time, version
+ id, agent_ip, cluster_name, config_params, task_params, module_params,
is_deleted, creator, modifier, create_time, modify_time, version
</sql>
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity">
insert into agent_task_config (id, agent_ip, cluster_name,
- config_params, task_params,
- creator, modifier)
+ config_params, task_params,
module_params,
+ creator, modifier)
values (#{id, jdbcType=INTEGER}, #{agentIp, jdbcType=VARCHAR},
#{clusterName, jdbcType=VARCHAR},
- #{configParams, jdbcType=VARCHAR}, #{taskParams,
jdbcType=VARCHAR},
+ #{configParams, jdbcType=VARCHAR}, #{taskParams,
jdbcType=VARCHAR}, #{moduleParams,jdbcType=VARCHAR},
#{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
</insert>
@@ -86,6 +87,9 @@
<if test="taskParams != null">
task_params = #{taskParams,jdbcType=VARCHAR},
</if>
+ <if test="moduleParams != null">
+ module_params = #{moduleParams,jdbcType=VARCHAR},
+ </if>
<if test="isDeleted != null">
is_deleted = #{isDeleted,jdbcType=INTEGER},
</if>
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/ModuleConfigMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/ModuleConfigMapper.xml
index 69008a0440..085fd140e5 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/ModuleConfigMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/ModuleConfigMapper.xml
@@ -101,4 +101,12 @@
</if>
</where>
</select>
+ <select id="selectAllModuleConfigs"
resultType="org.apache.inlong.manager.dao.entity.ModuleConfigEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from module_config
+ <where>
+ and is_deleted = 0
+ </where>
+ </select>
</mapper>
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/PackageConfigMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/PackageConfigMapper.xml
index fc7421a0aa..2dad86a732 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/PackageConfigMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/PackageConfigMapper.xml
@@ -101,4 +101,12 @@
</if>
</where>
</select>
+ <select id="selectAllPackageConfigs"
resultType="org.apache.inlong.manager.dao.entity.PackageConfigEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from package_config
+ <where>
+ and is_deleted = 0
+ </where>
+ </select>
</mapper>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
index 1c191dc096..789461cdca 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeOperator.java
@@ -17,24 +17,52 @@
package org.apache.inlong.manager.service.cluster.node;
+import org.apache.inlong.common.pojo.agent.AgentResponseCode;
+import org.apache.inlong.common.pojo.agent.installer.ConfigRequest;
+import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
+import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
+import org.apache.inlong.common.pojo.agent.installer.PackageConfig;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.ModuleType;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.AgentTaskConfigEntity;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
+import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
+import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
+import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeResponse;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeResponse;
+import org.apache.inlong.manager.pojo.module.ModuleDTO;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
/**
* Agent cluster node operator.
@@ -44,9 +72,17 @@ import org.springframework.stereotype.Service;
public class AgentClusterNodeOperator extends AbstractClusterNodeOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(AgentClusterNodeOperator.class);
-
+ private static final Gson GSON = new Gson();
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private InlongClusterEntityMapper clusterMapper;
+ @Autowired
+ private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
+ @Autowired
+ private PackageConfigEntityMapper packageConfigEntityMapper;
+ @Autowired
+ private ModuleConfigEntityMapper moduleConfigEntityMapper;
@Override
public Boolean accept(String clusterNodeType) {
@@ -58,6 +94,45 @@ public class AgentClusterNodeOperator extends
AbstractClusterNodeOperator {
return ClusterType.AGENT;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer saveOpt(ClusterNodeRequest request, String operator) {
+ InlongClusterNodeEntity entity =
CommonBeanUtils.copyProperties(request, InlongClusterNodeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ entity.setStatus(NodeStatus.HEARTBEAT_TIMEOUT.getStatus());
+ clusterNodeMapper.insert(entity);
+ InlongClusterEntity clusterEntity =
clusterMapper.selectById(request.getParentId());
+ String clusterName = clusterEntity.getName();
+ String ip = request.getIp();
+ updateModuleConfig(ip, clusterName);
+ return entity.getId();
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
+ public void updateOpt(ClusterNodeRequest request, String operator) {
+ InlongClusterNodeEntity entity =
clusterNodeMapper.selectById(request.getId());
+ // set the ext params
+ this.setTargetEntity(request, entity);
+ entity.setModifier(operator);
+ if (InlongConstants.AFFECTED_ONE_ROW !=
clusterNodeMapper.updateByIdSelective(entity)) {
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format(
+ "cluster node has already updated with ip=%s,
port=%s, protocolType=%s, type=%s, curVersion=%s",
+ entity.getIp(), entity.getPort(),
entity.getProtocolType(), entity.getType(),
+ entity.getVersion()));
+ }
+ InlongClusterEntity clusterEntity =
clusterMapper.selectById(request.getParentId());
+ String clusterName = clusterEntity.getName();
+ String ip = request.getIp();
+ updateModuleConfig(ip, clusterName);
+ LOGGER.debug("success to update inlong cluster node={}", request);
+ }
+
@Override
public ClusterNodeResponse getFromEntity(InlongClusterNodeEntity entity) {
if (entity == null) {
@@ -88,4 +163,95 @@ public class AgentClusterNodeOperator extends
AbstractClusterNodeOperator {
String.format("serialize extParams of Agent ClusterNode
failure: %s", e.getMessage()));
}
}
+
+ public void updateModuleConfig(String ip, String clusterName) {
+ try {
+ if (StringUtils.isBlank(clusterName) || StringUtils.isBlank(ip)) {
+ LOGGER.info("no need to update module config when ip or
cluster name is null");
+ return;
+ }
+ ConfigRequest configRequest = new ConfigRequest();
+ LOGGER.info("begin to update module config for cluster name={},
ip={}", clusterName, ip);
+ configRequest.setLocalIp(ip);
+ configRequest.setClusterName(clusterName);
+ ConfigResult configResult = loadModuleConfigs(configRequest);
+ AgentTaskConfigEntity existEntity =
agentTaskConfigEntityMapper.selectByIdentifier(ip, clusterName);
+ AgentTaskConfigEntity agentTaskConfigEntity = existEntity == null
? new AgentTaskConfigEntity()
+ : CommonBeanUtils.copyProperties(existEntity,
AgentTaskConfigEntity::new, true);
+ agentTaskConfigEntity.setAgentIp(ip);
+ agentTaskConfigEntity.setClusterName(clusterName);
+
agentTaskConfigEntity.setModuleParams(objectMapper.writeValueAsString(configResult));
+ if (existEntity == null) {
+ agentTaskConfigEntityMapper.insert(agentTaskConfigEntity);
+ } else {
+
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
+ }
+ LOGGER.info("success to update module config for cluster name={},
ip={}", clusterName, ip);
+ } catch (Exception e) {
+ LOGGER.error("load module config failed", e);
+ throw new BusinessException("load module config faield");
+ }
+ }
+
+ private ConfigResult loadModuleConfigs(ConfigRequest request) {
+ final String clusterName = request.getClusterName();
+ final String ip = request.getLocalIp();
+ LOGGER.debug("begin to load config for installer = {}", request);
+ Preconditions.expectTrue(StringUtils.isNotBlank(clusterName), "cluster
name is blank");
+ InlongClusterEntity clusterEntity =
clusterMapper.selectByNameAndType(clusterName, ClusterType.AGENT);
+ List<ModuleConfig> configs = new ArrayList<>();
+ if (clusterEntity != null) {
+ List<InlongClusterNodeEntity> clusterNodeEntityList =
+
clusterNodeMapper.selectByParentIdAndIp(clusterEntity.getId(), ip);
+ if (CollectionUtils.isNotEmpty(clusterNodeEntityList)) {
+ AgentClusterNodeDTO dto =
AgentClusterNodeDTO.getFromJson(clusterNodeEntityList.get(0).getExtParams());
+ configs = getModuleConfigs(dto);
+ }
+ }
+ String jsonStr = GSON.toJson(configs);
+ String configMd5 = DigestUtils.md5Hex(jsonStr);
+
+ ConfigResult configResult =
ConfigResult.builder().moduleList(configs).md5(configMd5)
+ .code(AgentResponseCode.SUCCESS)
+ .build();
+ LOGGER.info("success load module config, size = {}",
configResult.getModuleList().size());
+ return configResult;
+ }
+
+ private List<ModuleConfig> getModuleConfigs(AgentClusterNodeDTO dto) {
+ List<Integer> moduleIdList = dto.getModuleIdList();
+ List<ModuleConfig> configs = new ArrayList<>();
+ if (CollectionUtils.isEmpty(moduleIdList)) {
+ return configs;
+ }
+ for (Integer moduleId : moduleIdList) {
+ ModuleConfigEntity moduleConfigEntity =
moduleConfigEntityMapper.selectByPrimaryKey(moduleId);
+ if (moduleConfigEntity == null) {
+ continue;
+ }
+ ModuleConfig moduleConfig =
CommonBeanUtils.copyProperties(moduleConfigEntity, ModuleConfig::new);
+
moduleConfig.setId(ModuleType.forType(moduleConfigEntity.getType()).getModuleId());
+ moduleConfig.setEntityId(moduleConfigEntity.getId());
+ PackageConfigEntity packageConfigEntity =
+
packageConfigEntityMapper.selectByPrimaryKey(moduleConfigEntity.getPackageId());
+
moduleConfig.setPackageConfig(CommonBeanUtils.copyProperties(packageConfigEntity,
PackageConfig::new));
+
+ ModuleDTO moduleDTO =
JsonUtils.parseObject(moduleConfigEntity.getExtParams(), ModuleDTO.class);
+ moduleConfig = CommonBeanUtils.copyProperties(moduleDTO,
moduleConfig, true);
+ Integer restartTime = 0;
+ if (Objects.equals(moduleConfigEntity.getType(),
ModuleType.AGENT.name())) {
+ restartTime = dto.getAgentRestartTime();
+ }
+ if (Objects.equals(moduleConfigEntity.getType(),
ModuleType.INSTALLER.name())) {
+ restartTime = dto.getInstallRestartTime();
+ }
+ moduleConfig.setRestartTime(restartTime);
+ moduleConfig.setProcessesNum(1);
+ String moduleStr = GSON.toJson(moduleConfig);
+ String moduleMd5 = DigestUtils.md5Hex(moduleStr);
+ moduleConfig.setMd5(moduleMd5);
+ configs.add(moduleConfig);
+ }
+ return configs;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
index a491838c68..2dc504b5d1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConfigLoader.java
@@ -22,6 +22,8 @@ import
org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
+import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
@@ -132,4 +134,18 @@ public interface ConfigLoader {
*/
List<AgentTaskConfigEntity> loadAllAgentTaskConfigEntity();
+ /**
+ * Load all module config info
+ *
+ * @return List of module config info
+ */
+ List<ModuleConfigEntity> loadAllModuleConfigEntity();
+
+ /**
+ * Load all package config info
+ *
+ * @return List of package config info
+ */
+ List<PackageConfigEntity> loadAllPackageConfigEntity();
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f7d669151a..595b53576b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -60,8 +60,6 @@ import
org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
-import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import
org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
@@ -71,21 +69,18 @@ import
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.module.ModuleDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.cluster.node.AgentClusterNodeOperator;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.core.ConfigLoader;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
-import lombok.Getter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
@@ -111,14 +106,11 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -138,20 +130,12 @@ public class AgentServiceImpl implements AgentService {
private static final int MODULUS_100 = 100;
private static final int TASK_FETCH_SIZE = 2;
private static final Gson GSON = new Gson();
- private final ExecutorService executorService = new ThreadPoolExecutor(
- 5,
- 10,
- 10L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(100),
- new ThreadFactoryBuilder().setNameFormat("async-agent-%s").build(),
- new CallerRunsPolicy());
+ private final LinkedBlockingQueue<ConfigRequest> updateModuleConfigQueue =
new LinkedBlockingQueue<>();
private Map<String, TaskResult> taskConfigMap = new ConcurrentHashMap<>();
private Map<String, AgentConfigInfo> agentConfigMap = new
ConcurrentHashMap<>();
-
- @Getter
- private LoadingCache<ConfigRequest, ConfigResult> moduleConfigCache;
+ private Map<Integer, ModuleConfig> moduleConfigMap = new
ConcurrentHashMap<>();
+ private Map<String, ConfigResult> installerConfigMap = new
ConcurrentHashMap<>();
@Value("${source.update.enabled:false}")
private Boolean updateTaskTimeoutEnabled;
@@ -169,6 +153,8 @@ public class AgentServiceImpl implements AgentService {
private Integer dataAddTaskCleanInterval;
@Value("${add.task.retention.days:7}")
private Integer retentionDays;
+ @Value("${default.module.id:1}")
+ private Integer defaultModuleId;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@@ -187,11 +173,7 @@ public class AgentServiceImpl implements AgentService {
@Autowired
private SourceOperatorFactory operatorFactory;
@Autowired
- private ModuleConfigEntityMapper moduleConfigEntityMapper;
- @Autowired
- private PackageConfigEntityMapper packageConfigEntityMapper;
- @Autowired
- private InlongClusterService clusterService;
+ private AgentClusterNodeOperator agentClusterNodeOperator;
@Autowired
private ConfigLoader configLoader;
@@ -200,18 +182,6 @@ public class AgentServiceImpl implements AgentService {
*/
@PostConstruct
private void startHeartbeatTask() {
-
- // The expiry time of cluster info cache must be greater than
taskCache cache
- // because the eviction handler needs to query cluster info cache
- long expireTime = 10 * 5;
- LOGGER.debug("start to reload config for installer.");
- try {
- moduleConfigCache = Caffeine.newBuilder()
- .expireAfterWrite(expireTime * 2L, TimeUnit.SECONDS)
- .build(this::loadModuleConfigs);
- } catch (Throwable t) {
- LOGGER.error("fail to reload all config for installer ", t);
- }
try {
reload();
setReloadTimer();
@@ -302,10 +272,17 @@ public class AgentServiceImpl implements AgentService {
}
public void reload() {
+ reloadAgentTask();
+ reloadModule();
+ updateModuleConfig();
+ }
+
+ public void reloadAgentTask() {
LOGGER.debug("start to reload agent task config.");
try {
Map<String, TaskResult> newTaskConfigMap = new
ConcurrentHashMap<>();
Map<String, AgentConfigInfo> newAgentConfigMap = new
ConcurrentHashMap<>();
+ Map<String, ConfigResult> newInstallerConfigMap = new
ConcurrentHashMap<>();
List<AgentTaskConfigEntity> agentTaskConfigEntityList =
configLoader.loadAllAgentTaskConfigEntity();
agentTaskConfigEntityList.forEach(agentTaskConfigEntity -> {
try {
@@ -323,6 +300,12 @@ public class AgentServiceImpl implements AgentService {
agentConfigInfo.setVersion(agentTaskConfigEntity.getVersion());
newAgentConfigMap.putIfAbsent(key, agentConfigInfo);
}
+ ConfigResult configResult =
+
JsonUtils.parseObject(agentTaskConfigEntity.getModuleParams(),
ConfigResult.class);
+ if (configResult != null) {
+
configResult.setVersion(agentTaskConfigEntity.getVersion());
+ newInstallerConfigMap.putIfAbsent(key, configResult);
+ }
} catch (Exception e) {
LOGGER.error("failed to get agent task config for agent
ip={}, cluster name={}",
agentTaskConfigEntity.getAgentIp(),
agentTaskConfigEntity.getClusterName());
@@ -331,12 +314,104 @@ public class AgentServiceImpl implements AgentService {
});
taskConfigMap = newTaskConfigMap;
agentConfigMap = newAgentConfigMap;
+ installerConfigMap = newInstallerConfigMap;
} catch (Throwable t) {
LOGGER.error("failed to reload all agent task config", t);
}
LOGGER.debug("end to reload agent task config");
}
+ public void reloadModule() {
+ LOGGER.info("start to reload agent task config.");
+ try {
+ Map<Integer, ModuleConfig> newModuleConfigMap = new
ConcurrentHashMap<>();
+ List<ModuleConfigEntity> moduleConfigEntityList =
configLoader.loadAllModuleConfigEntity();
+ List<PackageConfigEntity> packageConfigEntityList =
configLoader.loadAllPackageConfigEntity();
+ Map<Integer, PackageConfigEntity> packageConfigMap = new
ConcurrentHashMap<>();
+ packageConfigEntityList.forEach(packageConfigEntity -> {
+ packageConfigMap.putIfAbsent(packageConfigEntity.getId(),
packageConfigEntity);
+ });
+ moduleConfigEntityList.forEach(moduleConfigEntity -> {
+ ModuleConfig moduleConfig =
CommonBeanUtils.copyProperties(moduleConfigEntity, ModuleConfig::new);
+
moduleConfig.setId(ModuleType.forType(moduleConfigEntity.getType()).getModuleId());
+ moduleConfig.setEntityId(moduleConfigEntity.getId());
+ PackageConfigEntity packageConfigEntity =
packageConfigMap.get(moduleConfigEntity.getPackageId());
+ moduleConfig
+
.setPackageConfig(CommonBeanUtils.copyProperties(packageConfigEntity,
PackageConfig::new));
+ ModuleDTO moduleDTO =
JsonUtils.parseObject(moduleConfigEntity.getExtParams(), ModuleDTO.class);
+ moduleConfig = CommonBeanUtils.copyProperties(moduleDTO,
moduleConfig, true);
+ moduleConfig.setProcessesNum(1);
+ newModuleConfigMap.putIfAbsent(moduleConfigEntity.getId(),
moduleConfig);
+ });
+ moduleConfigMap = newModuleConfigMap;
+ } catch (Throwable t) {
+ LOGGER.error("fail to reload module config", t);
+ }
+ LOGGER.debug("end to reload module config");
+ }
+
+ @Transactional(rollbackFor = Exception.class)
+ public void updateModuleConfig() {
+ LOGGER.info("start to update module config.");
+ try {
+ LinkedBlockingQueue<ConfigRequest> tempQueue = new
LinkedBlockingQueue<>();
+ if (updateModuleConfigQueue.isEmpty()) {
+ return;
+ }
+ int moveNum = updateModuleConfigQueue.drainTo(tempQueue);
+ LOGGER.info("begin to update module config source size={}, target
size={}, move num={}",
+ updateModuleConfigQueue.size(), tempQueue.size(), moveNum);
+
+ while (!tempQueue.isEmpty()) {
+ ConfigRequest configRequest = tempQueue.poll();
+ String ip = configRequest.getLocalIp();
+ String clusterName = configRequest.getClusterName();
+ String key = ip + InlongConstants.UNDERSCORE + clusterName;
+ ConfigResult configResult = installerConfigMap.get(key);
+ Integer restartTime = 0;
+ List<ModuleConfig> configs = new ArrayList<>();
+ List<Integer> moduleIdList = new ArrayList<>();
+ if (moduleConfigMap.isEmpty() ||
moduleConfigMap.get(defaultModuleId) != null) {
+ return;
+ }
+ if (configResult == null) {
+ moduleIdList.add(defaultModuleId);
+ } else {
+ if
(CollectionUtils.isNotEmpty(configResult.getModuleList())) {
+ restartTime =
configResult.getModuleList().get(0).getRestartTime();
+ }
+ for (ModuleConfig moduleConfig :
configResult.getModuleList()) {
+ moduleIdList.add(moduleConfig.getEntityId());
+ }
+ }
+ for (Integer moduleId : moduleIdList) {
+ ModuleConfig moduleConfig = moduleConfigMap.get(moduleId);
+ if (moduleConfig == null) {
+ continue;
+ }
+ moduleConfig.setRestartTime(restartTime);
+ String moduleStr = GSON.toJson(moduleConfig);
+ String moduleMd5 = DigestUtils.md5Hex(moduleStr);
+ moduleConfig.setMd5(moduleMd5);
+ configs.add(moduleConfig);
+ }
+ String jsonStr = GSON.toJson(configs);
+ String configMd5 = DigestUtils.md5Hex(jsonStr);
+ ConfigResult newConfigResult = ConfigResult.builder()
+ .moduleList(configs)
+ .md5(configMd5)
+ .code(AgentResponseCode.SUCCESS)
+ .build();
+ if (configResult == null ||
!Objects.equals(configResult.getMd5(), newConfigResult.getMd5())) {
+ agentClusterNodeOperator.updateModuleConfig(ip,
clusterName);
+ }
+ }
+ } catch (Throwable t) {
+ LOGGER.error("fail to update module config", t);
+ }
+ LOGGER.info("end to update module config");
+ }
+
/**
* Update task status by command.
*
@@ -417,6 +492,11 @@ public class AgentServiceImpl implements AgentService {
LOGGER.debug("begin to get all exist task by request={}", request);
String key = request.getAgentIp() + InlongConstants.UNDERSCORE +
request.getClusterName();
TaskResult taskResult = taskConfigMap.get(key);
+ if (taskResult == null) {
+ // When an agent is deployed in a container, tasks do not need to
specify an IP address
+ key = "All" + InlongConstants.UNDERSCORE +
request.getClusterName();
+ taskResult = taskConfigMap.get(key);
+ }
if (taskResult == null) {
return null;
}
@@ -499,7 +579,12 @@ public class AgentServiceImpl implements AgentService {
@Override
public ConfigResult getConfig(ConfigRequest request) {
- ConfigResult configResult = moduleConfigCache.get(request);
+ if (!updateModuleConfigQueue.contains(request)) {
+ updateModuleConfigQueue.add(request);
+ }
+ String key = request.getLocalIp() + InlongConstants.UNDERSCORE +
request.getClusterName();
+
+ ConfigResult configResult = installerConfigMap.get(key);
if (configResult == null) {
LOGGER.debug(String.format("can not get config result for cluster
name=%s, ip=%s", request.getClusterName(),
request.getLocalIp()));
@@ -871,59 +956,4 @@ public class AgentServiceImpl implements AgentService {
return sourceGroups.stream().anyMatch(clusterNodeGroups::contains);
}
- private ConfigResult loadModuleConfigs(ConfigRequest request) {
- final String clusterName = request.getClusterName();
- final String ip = request.getLocalIp();
- LOGGER.debug("begin to load config for installer = {}", request);
- Preconditions.expectTrue(StringUtils.isNotBlank(clusterName), "cluster
name is blank");
- InlongClusterEntity clusterEntity =
clusterMapper.selectByNameAndType(clusterName, ClusterType.AGENT);
- List<InlongClusterNodeEntity> clusterNodeEntityList =
- clusterNodeMapper.selectByParentIdAndIp(clusterEntity.getId(),
ip);
- List<ModuleConfig> configs = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(clusterNodeEntityList)) {
- AgentClusterNodeDTO dto =
AgentClusterNodeDTO.getFromJson(clusterNodeEntityList.get(0).getExtParams());
- configs = getModuleConfigs(dto);
- }
- String jsonStr = GSON.toJson(configs);
- String configMd5 = DigestUtils.md5Hex(jsonStr);
-
- ConfigResult configResult =
ConfigResult.builder().moduleList(configs).md5(configMd5)
- .code(AgentResponseCode.SUCCESS)
- .build();
- LOGGER.info("success load module config, size = {}",
configResult.getModuleList().size());
- return configResult;
- }
-
- private List<ModuleConfig> getModuleConfigs(AgentClusterNodeDTO dto) {
- List<Integer> moduleIdList = dto.getModuleIdList();
- List<ModuleConfig> configs = new ArrayList<>();
- if (CollectionUtils.isEmpty(moduleIdList)) {
- return configs;
- }
- for (Integer moduleId : moduleIdList) {
- ModuleConfigEntity moduleConfigEntity =
moduleConfigEntityMapper.selectByPrimaryKey(moduleId);
- ModuleConfig moduleConfig =
CommonBeanUtils.copyProperties(moduleConfigEntity, ModuleConfig::new);
-
moduleConfig.setId(ModuleType.forType(moduleConfigEntity.getType()).getModuleId());
- PackageConfigEntity packageConfigEntity =
-
packageConfigEntityMapper.selectByPrimaryKey(moduleConfigEntity.getPackageId());
- moduleConfig
-
.setPackageConfig(CommonBeanUtils.copyProperties(packageConfigEntity,
PackageConfig::new));
- ModuleDTO moduleDTO =
JsonUtils.parseObject(moduleConfigEntity.getExtParams(), ModuleDTO.class);
- moduleConfig = CommonBeanUtils.copyProperties(moduleDTO,
moduleConfig, true);
- Integer restartTime = 0;
- if (Objects.equals(moduleConfigEntity.getType(),
ModuleType.AGENT.name())) {
- restartTime = dto.getAgentRestartTime();
- }
- if (Objects.equals(moduleConfigEntity.getType(),
ModuleType.INSTALLER.name())) {
- restartTime = dto.getInstallRestartTime();
- }
- moduleConfig.setRestartTime(restartTime);
- String moduleStr = GSON.toJson(moduleConfig);
- String moduleMd5 = DigestUtils.md5Hex(moduleStr);
- moduleConfig.setMd5(moduleMd5);
- moduleConfig.setProcessesNum(1);
- configs.add(moduleConfig);
- }
- return configs;
- }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
index 649586bb21..f9cd08d16d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConfigLoaderImpl.java
@@ -22,6 +22,8 @@ import
org.apache.inlong.manager.dao.entity.ClusterConfigEntity;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
+import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
import org.apache.inlong.manager.dao.entity.SortConfigEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.AgentTaskConfigEntityMapper;
@@ -32,6 +34,8 @@ import
org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
+import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
@@ -76,6 +80,10 @@ public class ConfigLoaderImpl implements ConfigLoader {
private ClusterConfigEntityMapper clusterConfigEntityMapper;
@Autowired
private AgentTaskConfigEntityMapper agentTaskConfigEntityMapper;
+ @Autowired
+ private ModuleConfigEntityMapper moduleConfigEntityMapper;
+ @Autowired
+ private PackageConfigEntityMapper packageConfigEntityMapper;
@Transactional
@Override
@@ -193,4 +201,23 @@ public class ConfigLoaderImpl implements ConfigLoader {
cursor.forEach(agentTaskConfigEntityList::add);
return agentTaskConfigEntityList;
}
+
+ @Transactional
+ @Override
+ public List<ModuleConfigEntity> loadAllModuleConfigEntity() {
+ Cursor<ModuleConfigEntity> cursor =
moduleConfigEntityMapper.selectAllModuleConfigs();
+ List<ModuleConfigEntity> moduleConfigEntityList = new ArrayList<>();
+ cursor.forEach(moduleConfigEntityList::add);
+ return moduleConfigEntityList;
+ }
+
+ @Transactional
+ @Override
+ public List<PackageConfigEntity> loadAllPackageConfigEntity() {
+ Cursor<PackageConfigEntity> cursor =
packageConfigEntityMapper.selectAllPackageConfigs();
+ List<PackageConfigEntity> packageConfigEntityList = new ArrayList<>();
+ cursor.forEach(packageConfigEntityList::add);
+ return packageConfigEntityList;
+ }
+
}
diff --git
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 531a639d4a..a1bff1615c 100644
---
a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++
b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -908,7 +908,8 @@ CREATE TABLE IF NOT EXISTS `agent_task_config`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`config_params` text DEFAULT NULL COMMENT 'The agent config
params',
- `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `task_params` text DEFAULT NULL COMMENT 'The agent task
config params',
+ `module_params` text DEFAULT NULL COMMENT 'The module
config params',
`agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
`cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
`creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 430982df28..a1388e6368 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -959,7 +959,8 @@ CREATE TABLE IF NOT EXISTS `agent_task_config`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`config_params` text DEFAULT NULL COMMENT 'The agent config
params',
- `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `task_params` text DEFAULT NULL COMMENT 'The agent task
config params',
+ `module_params` text DEFAULT NULL COMMENT 'The module
config params',
`agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
`cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
`creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
diff --git a/inlong-manager/manager-web/sql/changes-1.13.0.sql
b/inlong-manager/manager-web/sql/changes-1.13.0.sql
index 166c98f199..8e53c38ac2 100644
--- a/inlong-manager/manager-web/sql/changes-1.13.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.13.0.sql
@@ -126,7 +126,8 @@ CREATE TABLE IF NOT EXISTS `agent_task_config`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT
'Incremental primary key',
`config_params` text DEFAULT NULL COMMENT 'The agent config
params',
- `task_params` text NOT NULL COMMENT 'The agent task
config params',
+ `task_params` text DEFAULT NULL COMMENT 'The agent task
config params',
+ `module_params` text DEFAULT NULL COMMENT 'The module
config params',
`agent_ip` varchar(128) NOT NULL COMMENT 'agent ip',
`cluster_name` varchar(128) NOT NULL COMMENT 'Inlong cluster name',
`creator` varchar(128) DEFAULT NULL COMMENT 'Creator',
diff --git
a/inlong-manager/manager-web/src/main/resources/application-dev.properties
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 60f82953a5..0f2392ab2f 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -99,7 +99,8 @@ cls.manager.endpoint=127.0.0.1
manager.url=127.0.0.1:8083
agent.install.path=
-
+# The primary key id of the default agent module used
+default.module.id=1
# schedule engine type
# support none(no scheduler) and quartz(quartz scheduler), default is none
inlong.schedule.engine=none
\ No newline at end of file