This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 0db3d054f79277fcdd089826d9791119a4d31cd9 Author: fuweng11 <[email protected]> AuthorDate: Tue Nov 15 12:38:26 2022 +0800 [INLONG-6533][Manager] Fix the problem of not distributing the task according to the cluster name (#6534) * Fix the problem of not distributing the task according to the cluster name * Extract a constant to improve the code Co-authored-by: healchow <[email protected]> --- .../dao/mapper/StreamSourceEntityMapper.java | 6 ++-- .../resources/mappers/StreamSourceEntityMapper.xml | 15 +++++---- .../service/core/impl/AgentServiceImpl.java | 36 ++++++++++++---------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index aee466d88..caa80bcc9 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -81,12 +81,12 @@ public interface StreamSourceEntityMapper { @Param("clusterName") String clusterName); /** - * Query the sources with status 20x by the given agent IP and agent UUID. + * Query the sources by the given status and Agent cluster info. * * @apiNote Sources with is_deleted > 0 should also be returned to agents to clear their local tasks. */ - List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList, - @Param("agentIp") String agentIp, @Param("uuid") String uuid); + List<StreamSourceEntity> selectByStatusAndCluster(@Param("statusList") List<Integer> statusList, + @Param("clusterName") String clusterName, @Param("agentIp") String agentIp, @Param("uuid") String uuid); /** * Select all sources by groupIds diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index e1e7f56a8..97ef663fe 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -224,18 +224,17 @@ and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}) </where> </select> - <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> + <select id="selectByStatusAndCluster" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> select <include refid="Base_Column_List"/> from stream_source <where> - agent_ip = #{agentIp, jdbcType=VARCHAR} - <if test="statusList != null and statusList.size()>0"> - and status in - <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=","> - #{item} - </foreach> - </if> + inlong_cluster_name = #{clusterName, jdbcType=VARCHAR} + and agent_ip = #{agentIp, jdbcType=VARCHAR} + and `status` in + <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=","> + #{item} + </foreach> <if test="uuid != null and uuid != ''"> and uuid = #{uuid, jdbcType=VARCHAR} </if> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index d091bdf7e..29f0dcbff 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -71,6 +71,18 @@ public class AgentServiceImpl implements AgentService { private static final int MODULUS_100 = 100; private static final int TASK_FETCH_SIZE = 2; + /** + * Need issued status list, not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE + */ + private static final List<Integer> NEED_ISSUED_STATUS = Arrays.asList( + SourceStatus.TO_BE_ISSUED_DELETE.getCode(), + SourceStatus.TO_BE_ISSUED_RETRY.getCode(), + SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(), + SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), + SourceStatus.TO_BE_ISSUED_CHECK.getCode(), + SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), + SourceStatus.TO_BE_ISSUED_MAKEUP.getCode()); + @Autowired private StreamSourceEntityMapper sourceMapper; @Autowired @@ -153,20 +165,17 @@ public class AgentServiceImpl implements AgentService { @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public TaskResult getTaskResult(TaskRequest request) { - if (request == null || StringUtils.isBlank(request.getAgentIp())) { + if (StringUtils.isBlank(request.getClusterName()) || StringUtils.isBlank(request.getAgentIp())) { throw new BusinessException("agent request or agent ip was empty, just return"); } List<DataConfig> tasks = Lists.newArrayList(); // Query the tasks that needed to add or active - without agentIp and uuid - List<DataConfig> nonFileTasks = fetchNonFileTasks(request); - tasks.addAll(nonFileTasks); + tasks.addAll(fetchNonFileTasks(request)); // Query file collecting tasks - List<DataConfig> fileTasks = fetchFileTasks(request); - tasks.addAll(fileTasks); - // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE - List<DataConfig> needIssuedTasks = fetchIssuedTasks(request); - tasks.addAll(needIssuedTasks); + tasks.addAll(fetchFileTasks(request)); + // Query other tasks by agentIp and uuid + tasks.addAll(fetchNeedIssuedTasks(request)); // Query pending special commands List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request); @@ -281,14 +290,9 @@ public class AgentServiceImpl implements AgentService { return null; } - private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) { - final String agentIp = taskRequest.getAgentIp(); - final String uuid = taskRequest.getUuid(); - List<Integer> statusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_DELETE.getCode(), - SourceStatus.TO_BE_ISSUED_RETRY.getCode(), SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(), - SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), SourceStatus.TO_BE_ISSUED_CHECK.getCode(), - SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceStatus.TO_BE_ISSUED_MAKEUP.getCode()); - List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid); + private List<DataConfig> fetchNeedIssuedTasks(TaskRequest taskRequest) { + List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndCluster(NEED_ISSUED_STATUS, + taskRequest.getClusterName(), taskRequest.getAgentIp(), taskRequest.getUuid()); List<DataConfig> issuedTasks = Lists.newArrayList(); for (StreamSourceEntity issuedTask : sourceEntities) { int op = getOp(issuedTask.getStatus());
