This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 1f97f202108731d7bf31381d3f3e404fd46fabc2 Author: woofyzhao <[email protected]> AuthorDate: Thu Aug 25 16:04:36 2022 +0800 [INLONG-5691][Manager] Remove the append operation for agent IPs (#5692) --- .../manager/dao/mapper/StreamSourceEntityMapper.java | 2 -- .../resources/mappers/StreamSourceEntityMapper.xml | 5 ----- .../manager/service/core/impl/AgentServiceImpl.java | 20 ++++++++------------ .../src/main/resources/h2/apache_inlong_manager.sql | 5 +++-- .../manager-web/sql/apache_inlong_manager.sql | 5 +++-- 5 files changed, 14 insertions(+), 23 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index eabd61220..3b0144d3d 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -129,8 +129,6 @@ public interface StreamSourceEntityMapper { int updateSnapshot(StreamSourceEntity entity); - int appendAgentIp(@Param("id") Integer id, @Param("agentIp") String agentIp); - /** * Physical delete stream sources. */ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 32f8c56ef..c1b955f57 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -390,11 +390,6 @@ modify_time = modify_time where id = #{id,jdbcType=INTEGER} </update> - <update id="appendAgentIp"> - update stream_source - set agent_ip = IF(agent_ip is NULL or agent_ip = '', #{agentIp,jdbcType=VARCHAR}, CONCAT(agent_ip, ',', #{agentIp})) - where id = #{id,jdbcType=INTEGER} - </update> <delete id="deleteByRelatedId"> delete diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 94ec64d61..91b3f7238 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -30,12 +30,12 @@ import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.common.pojo.agent.TaskRequest; import org.apache.inlong.common.pojo.agent.TaskResult; import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.SourceStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper; @@ -212,6 +212,8 @@ public class AgentServiceImpl implements AgentService { final String agentIp = taskRequest.getAgentIp(); final String agentClusterName = taskRequest.getClusterName(); final String uuid = taskRequest.getUuid(); + Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName), + "both agent ip and cluster name are blank when fetching file task"); List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList, Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10); List<DataConfig> fileTasks = Lists.newArrayList(); @@ -235,18 +237,15 @@ public class AgentServiceImpl implements AgentService { continue; } - // Cluster name is not blank, split task if necessary - // The agent ip field of the entity holds the ip list of the agents that has already been issued + // Cluster name is not blank, split subtask if necessary + // The template task's id is assigned to the subtask's template id field if (StringUtils.isNotBlank(destClusterName) && destClusterName.equals(agentClusterName) && Objects.isNull(sourceEntity.getTemplateId())) { // Is the task already fetched by this agent ? - if (StringUtils.isNotBlank(sourceEntity.getAgentIp())) { - if (Arrays.asList(sourceEntity.getAgentIp().split(InlongConstants.COMMA)).contains(agentIp)) { - LOGGER.debug("Task={} has already been fetched by agentIP={}", sourceEntity.getExtParams(), - agentIp); - continue; - } + List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId()); + if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) { + continue; } // If not, clone a sub task for the new agent @@ -264,9 +263,6 @@ public class AgentServiceImpl implements AgentService { if (sourceMapper.insert(fileEntity) > 0) { fileTasks.add(getDataConfig(fileEntity, op)); } - - // Append new agent ip - sourceMapper.appendAgentIp(sourceEntity.getId(), agentIp); } if (fileTasks.size() >= TASK_FETCH_SIZE) { break; diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql index 2d7753dfb..99a22413c 100644 --- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql +++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql @@ -382,7 +382,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', - `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', + `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', @@ -401,7 +401,8 @@ CREATE TABLE IF NOT EXISTS `stream_source` PRIMARY KEY (`id`), UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`), KEY `source_status_index` (`status`, `is_deleted`), - KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`) + KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`), + KEY `template_id_index` (`template_id`) ); -- ---------------------------- diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql index 0c249a2ff..e7bbf585b 100644 --- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql +++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql @@ -403,7 +403,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', `source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', `template_id` int(11) DEFAULT NULL COMMENT 'Id of the template task this agent belongs to', - `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', + `agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task, NULL if this is a template task', `uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', @@ -422,7 +422,8 @@ CREATE TABLE IF NOT EXISTS `stream_source` PRIMARY KEY (`id`), UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`), KEY `source_status_index` (`status`, `is_deleted`), - KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`) + KEY `source_agent_ip_index` (`agent_ip`, `is_deleted`), + KEY `template_id_index` (`template_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
