This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cb5f1517e [INLONG-11368][Manager] Determine whether to issue a 
streamSource based on the stream status (#11371)
7cb5f1517e is described below

commit 7cb5f1517ee55d2bf1483cca800f81af6444f0f2
Author: fuweng11 <[email protected]>
AuthorDate: Fri Oct 18 14:15:22 2024 +0800

    [INLONG-11368][Manager] Determine whether to issue a streamSource based on 
the stream status (#11371)
---
 .../manager/service/group/GroupCheckService.java     | 20 ++++++++++++++++++--
 .../service/source/AbstractSourceOperator.java       | 16 ++++++++--------
 .../manager/service/source/StreamSourceOperator.java |  4 ++--
 .../service/source/StreamSourceServiceImpl.java      | 10 +++++++---
 4 files changed, 35 insertions(+), 15 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
index fe1c7a4451..6824cba63b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.group;
 
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -36,7 +38,7 @@ public class GroupCheckService {
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
-    private UserEntityMapper userMapper;
+    private InlongStreamEntityMapper streamMapper;
 
     /**
      * Check whether the inlong group status is temporary
@@ -58,4 +60,18 @@ public class GroupCheckService {
         return inlongGroupEntity;
     }
 
+    public InlongStreamEntity checkStreamStatus(String groupId, String 
streamId, String operator) {
+        InlongStreamEntity inlongStreamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
+        if (inlongStreamEntity == null) {
+            throw new BusinessException(
+                    String.format("InlongStream does not exist with 
groupId=%s, streamId=%s", groupId, streamId));
+        }
+
+        StreamStatus status = 
StreamStatus.forCode(inlongStreamEntity.getStatus());
+        if (StreamStatus.notAllowedUpdate(status)) {
+            throw new 
BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(),
 status));
+        }
+
+        return inlongStreamEntity;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 885fd2a99e..cdf6290281 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -32,8 +32,8 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -146,12 +146,12 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
-    public Integer saveOpt(SourceRequest request, Integer groupStatus, String 
operator) {
+    public Integer saveOpt(SourceRequest request, Integer streamStatus, String 
operator) {
         StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, 
StreamSourceEntity::new);
         if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
             // auto push task needs not be issued to agent
             entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
-        } else if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+        } else if 
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
             entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
         } else {
             entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
@@ -166,7 +166,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         if (request.getEnableSyncSchema()) {
             syncSourceFieldInfo(request, operator);
         }
-        if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+        if 
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
             updateAgentTaskConfig(request, operator);
         }
         return entity.getId();
@@ -188,7 +188,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
 
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
-    public void updateOpt(SourceRequest request, Integer groupStatus, Integer 
groupMode, String operator) {
+    public void updateOpt(SourceRequest request, Integer streamStatus, Integer 
groupMode, String operator) {
         StreamSourceEntity entity = 
sourceMapper.selectByIdForUpdate(request.getId());
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
@@ -242,7 +242,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         if (InlongConstants.STANDARD_MODE.equals(groupMode)) {
             SourceStatus sourceStatus = 
SourceStatus.forCode(entity.getStatus());
             Integer nextStatus = entity.getStatus();
-            if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+            if 
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
                 nextStatus = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
             } else {
                 switch (SourceStatus.forCode(entity.getStatus())) {
@@ -267,7 +267,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         }
         updateFieldOpt(entity, request.getFieldList());
         LOGGER.debug("success to update source of type={}", 
request.getSourceType());
-        if 
(GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+        if 
(StreamStatus.forCode(streamStatus).equals(StreamStatus.CONFIG_SUCCESSFUL)) {
             updateAgentTaskConfig(request, operator);
         }
     }
@@ -479,7 +479,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
                         return cmdConfig;
                     }).collect(Collectors.toList());
             if (CollectionUtils.isEmpty(taskLists)) {
-                agentTaskConfigEntity.setTaskParams(null);
+                agentTaskConfigEntity.setTaskParams("");
                 
agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity);
                 return;
             }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index e820140ae6..07b0879330 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -50,11 +50,11 @@ public interface StreamSourceOperator {
      * Save the source info.
      *
      * @param request request of source
-     * @param groupStatus the belongs group status
+     * @param streamStatus the belongs stream status
      * @param operator name of operator
      * @return source id after saving
      */
-    Integer saveOpt(SourceRequest request, Integer groupStatus, String 
operator);
+    Integer saveOpt(SourceRequest request, Integer streamStatus, String 
operator);
 
     /**
      * Get source info by the given entity.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index de96386b25..e82d9d2aeb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
@@ -103,8 +104,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
 
         // Check if it can be added
         String groupId = request.getInlongGroupId();
-        InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(groupId, operator);
         String streamId = request.getInlongStreamId();
+        InlongStreamEntity streamEntity = 
groupCheckService.checkStreamStatus(groupId, streamId, operator);
         String sourceName = request.getSourceName();
         List<StreamSourceEntity> existList = 
sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
         if (CollectionUtils.isNotEmpty(existList)) {
@@ -119,7 +120,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         if (CollectionUtils.isNotEmpty(streamFields)) {
             streamFields.forEach(streamField -> streamField.setId(null));
         }
-        int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), 
operator);
+        int id = sourceOperator.saveOpt(request, streamEntity.getStatus(), 
operator);
 
         LOGGER.info("success to save source info: {}", request);
         return id;
@@ -300,6 +301,9 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
                     String.format("InlongGroup does not exist with 
InlongGroupId=%s", groupId));
         }
+        String streamId = request.getInlongStreamId();
+        InlongStreamEntity streamEntity = 
groupCheckService.checkStreamStatus(groupId, streamId, operator);
+
         userService.checkUser(groupEntity.getInCharges(), operator,
                 "Current user does not have permission to update source info");
         StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
@@ -308,7 +312,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         if (CollectionUtils.isNotEmpty(streamFields)) {
             streamFields.forEach(streamField -> streamField.setId(null));
         }
-        sourceOperator.updateOpt(request, groupEntity.getStatus(), 
groupEntity.getInlongGroupMode(), operator);
+        sourceOperator.updateOpt(request, streamEntity.getStatus(), 
groupEntity.getInlongGroupMode(), operator);
 
         LOGGER.info("success to update source info: {}", request);
         return true;

Reply via email to