This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e51083c [INLONG-7619][Manager] Support update and retry MySQL 
sources after updating MySQLDataNode (#7621)
71e51083c is described below

commit 71e51083cb4a0e114da6bffd6b87b571e1021cbc
Author: fuweng11 <[email protected]>
AuthorDate: Thu Mar 16 18:43:17 2023 +0800

    [INLONG-7619][Manager] Support update and retry MySQL sources after 
updating MySQLDataNode (#7621)
---
 .../inlong/manager/common/enums/GroupStatus.java   |  8 +++++
 .../dao/mapper/StreamSourceEntityMapper.java       | 16 +++++++++
 .../resources/mappers/StreamSourceEntityMapper.xml | 42 +++++++++++++++++++++-
 .../service/node/AbstractDataNodeOperator.java     | 29 +++++++++++++--
 .../manager/service/node/DataNodeOperator.java     |  4 ++-
 .../manager/service/node/DataNodeServiceImpl.java  |  3 +-
 .../service/node/mysql/MySQLDataNodeOperator.java  | 15 ++++++++
 7 files changed, 112 insertions(+), 5 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index 565b43576..c265a0ba8 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -126,6 +126,14 @@ public enum GroupStatus {
                 || status == GroupStatus.CONFIG_FAILED;
     }
 
+    /**
+     * Checks whether the given status allows updating stream source.
+     */
+    public static boolean allowedUpdateSource(GroupStatus status) {
+        return status == GroupStatus.CONFIG_SUCCESSFUL
+                || status == GroupStatus.CONFIG_FAILED;
+    }
+
     /**
      * Checks whether the given status needs to delete the inlong stream first.
      */
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ba7ad89d1..99a06ac34 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -119,6 +119,12 @@ public interface StreamSourceEntityMapper {
      */
     List<String> selectSourceType(@Param("groupId") String groupId, 
@Param("streamId") String streamId);
 
+    /**
+     * Query need update source according to the dataNodeName , clusterName, 
sourceType
+     */
+    List<Integer> 
selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String 
clusterName,
+            @Param("nodeName") String nodeName, @Param("sourceType") String 
sourceType);
+
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
     int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") 
String streamId,
@@ -148,6 +154,16 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
+    /**
+     * Update the source status
+     *
+     * @param idList source id list
+     * @param status modify the status to this
+     * @param operator operator name
+     */
+    void updateStatusByIds(@Param("idList") List<Integer> idList, 
@Param("status") Integer status,
+            @Param("operator") String operator);
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 0f6aeb5bb..7c2df0199 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -319,7 +319,29 @@
             for update
         </where>
     </select>
-
+    <select id="selectNeedUpdateIdsByClusterAndDataNode" 
resultType="java.lang.Integer">
+        select source.id
+        from stream_source source, inlong_stream stream, inlong_group 
inlong_group
+        <where>
+            source.is_deleted = 0
+            and inlong_group.inlong_group_id = source.inlong_group_id
+            and inlong_group.is_deleted = 0
+            and inlong_group.status in (120, 130)
+            and stream.inlong_group_id = source.inlong_group_id
+            and stream.is_deleted = 0
+            and stream.status in (120, 130)
+            and source.status not in (99, 110)
+            <if test="clusterName != null">
+                and source.inlong_cluster_name = #{clusterName, 
jdbcType=VARCHAR}
+            </if>
+            <if test="nodeName != null">
+                and source.data_node_name = #{nodeName, jdbcType=VARCHAR}
+            </if>
+            <if test="sourceType != null">
+                and source.source_type = #{sourceType, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
     <update id="updateByRelatedId">
         update stream_source
         <set>
@@ -458,6 +480,24 @@
             modify_time = modify_time
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <update id="updateStatusByIds">
+        update stream_source
+        <set>
+            previous_status = status,
+            status          = #{status, jdbcType=INTEGER},
+            modifier        = #{operator, jdbcType=VARCHAR},
+            version         = version + 1
+        </set>
+        <where>
+            is_deleted = 0
+            <if test="idList != null and idList.size() > 0">
+                and id in
+                <foreach item="item" index="index" collection="idList" 
open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+    </update>
 
     <delete id="deleteByRelatedId">
         delete
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index a2e4c2eb1..efa174fd9 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -19,11 +19,15 @@ package org.apache.inlong.manager.service.node;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.slf4j.Logger;
@@ -33,6 +37,7 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,6 +49,12 @@ public abstract class AbstractDataNodeOperator implements 
DataNodeOperator {
 
     @Autowired
     protected DataNodeEntityMapper dataNodeEntityMapper;
+    @Autowired
+    protected StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    protected InlongGroupEntityMapper groupMapper;
+    @Autowired
+    protected InlongStreamEntityMapper streamMapper;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -72,7 +83,6 @@ public abstract class AbstractDataNodeOperator implements 
DataNodeOperator {
         DataNodeEntity entity = CommonBeanUtils.copyProperties(request, 
DataNodeEntity::new);
         // set the ext params
         this.setTargetEntity(request, entity);
-        this.updateRelatedStreamSource(request);
         entity.setModifier(operator);
         int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
@@ -95,7 +105,22 @@ public abstract class AbstractDataNodeOperator implements 
DataNodeOperator {
     }
 
     @Override
-    public void updateRelatedStreamSource(DataNodeRequest request) {
+    public void updateRelatedStreamSource(DataNodeRequest request, 
DataNodeEntity entity, String operator) {
         LOGGER.info("do nothing for the data node type ={}", 
request.getType());
     }
+
+    public void retryStreamSourceByDataNodeNameAndType(String dataNodeName, 
String type, String operator) {
+        Integer status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
+        LOGGER.info("begin to update stream source status by dataNodeName={}, 
status={}, by operator={}",
+                dataNodeName, status, operator);
+        List<Integer> needUpdateIds = 
sourceMapper.selectNeedUpdateIdsByClusterAndDataNode(null, dataNodeName, type);
+        try {
+            sourceMapper.updateStatusByIds(needUpdateIds, status, operator);
+            LOGGER.info("success to update stream source status by 
dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator);
+        } catch (Exception e) {
+            LOGGER.error("failed to update stream source status by 
dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator, e);
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index a0618f4bb..58a12b732 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -83,7 +83,9 @@ public interface DataNodeOperator {
      * Update related stream source.
      *
      * @param request data node request
+     * @param entity data node entity
+     * @param operator operator
      */
-    void updateRelatedStreamSource(DataNodeRequest request);
+    void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity 
entity, String operator);
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index d44f32d03..b2d094e64 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -200,7 +200,7 @@ public class DataNodeServiceImpl implements DataNodeService 
{
         }
         DataNodeOperator dataNodeOperator = 
operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, operator);
-
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, 
operator);
         LOGGER.info("success to update data node={}", request);
         return true;
     }
@@ -238,6 +238,7 @@ public class DataNodeServiceImpl implements DataNodeService 
{
         }
         DataNodeOperator dataNodeOperator = 
operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, opInfo.getName());
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, 
opInfo.getName());
         return true;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 7bc1f8f4b..f3e108ab5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.node.mysql;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -38,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.sql.Connection;
+import java.util.Objects;
 
 /**
  * MySQL data node operator
@@ -106,4 +108,17 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public void updateRelatedStreamSource(DataNodeRequest request, 
DataNodeEntity entity, String operator) {
+        MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) 
request;
+        MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) 
this.getFromEntity(entity);
+        boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), 
mySQLDataNodeInfo.getUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), 
mySQLDataNodeInfo.getBackupUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getUsername(), 
mySQLDataNodeInfo.getUsername())
+                || !Objects.equals(mySQLDataNodeRequest.getToken(), 
mySQLDataNodeInfo.getToken());
+        if (changed) {
+            retryStreamSourceByDataNodeNameAndType(request.getName(), 
SourceType.MYSQL_SQL, operator);
+        }
+    }
+
 }

Reply via email to