This is an automated email from the ASF dual-hosted git repository.

wakefu pushed a commit to branch branch-1.13
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.13 by this push:
     new ecfae06b9c [INLONG-10714][Manager] Fix the problem of incorrect 
deletion of data source task (#10715)
ecfae06b9c is described below

commit ecfae06b9ce1d54c85c685ea30eda20c35c855c1
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jul 24 19:31:15 2024 +0800

    [INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task (#10715)
    
    * [INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task
    
    * [INLONG-10714][Manager] Fix the problem of incorrect deletion of data 
source task
---
 .../source/AbstractSourceOperateListener.java      | 60 ++--------------------
 .../service/source/AbstractSourceOperator.java     |  8 ---
 .../service/source/StreamSourceServiceImpl.java    |  9 ++--
 3 files changed, 7 insertions(+), 70 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index ccefd5cf10..6b19648b11 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -18,9 +18,7 @@
 package org.apache.inlong.manager.service.listener.source;
 
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -33,15 +31,11 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Event listener of operate resources, such as delete, stop, restart sources.
@@ -67,67 +61,21 @@ public abstract class AbstractSourceOperateListener 
implements SourceOperateList
         InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
         final String groupId = groupInfo.getInlongGroupId();
         List<InlongStreamBriefInfo> streamResponses = 
streamService.listBriefWithSink(groupId);
-        List<StreamSource> unOperatedSources = Lists.newArrayList();
-        streamResponses.forEach(stream -> operateStreamSources(groupId, 
stream.getInlongStreamId(),
-                context.getOperator(), unOperatedSources));
-
-        if (CollectionUtils.isNotEmpty(unOperatedSources)) {
-            GroupOperateType operateType = 
getOperateType(context.getProcessForm());
-            StringBuilder builder = new StringBuilder("Unsupported operate 
").append(operateType).append(" for (");
-            unOperatedSources.forEach(source -> builder.append(" 
").append(source.getSourceName()).append(" "));
-            String errMsg = builder.append(")").toString();
-            throw new WorkflowListenerException(errMsg);
-        }
-
+        streamResponses
+                .forEach(stream -> operateStreamSources(groupId, 
stream.getInlongStreamId(), context.getOperator()));
         return ListenerResult.success();
     }
 
     /**
      * Operate stream sources, such as delete, stop, restart.
      */
-    protected void operateStreamSources(String groupId, String streamId, 
String operator,
-            List<StreamSource> unOperatedSources) {
+    protected void operateStreamSources(String groupId, String streamId, 
String operator) {
         List<StreamSource> sources = streamSourceService.listSource(groupId, 
streamId);
         sources.forEach(source -> {
-            if (checkIfOp(source, unOperatedSources)) {
-                operateStreamSource(source.genSourceRequest(), operator);
-            }
+            operateStreamSource(source.genSourceRequest(), operator);
         });
     }
 
-    /**
-     * Check source status.
-     */
-    @SneakyThrows
-    public boolean checkIfOp(StreamSource streamSource, List<StreamSource> 
unOperatedSources) {
-        for (int retry = 0; retry < 60; retry++) {
-            int status = streamSource.getStatus();
-            SourceStatus sourceStatus = SourceStatus.forCode(status);
-            // template sources are filtered and processed in corresponding 
subclass listeners
-            if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == 
SourceStatus.SOURCE_STOP
-                    || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
-                    || 
CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
-                return true;
-            } else if (sourceStatus == SourceStatus.SOURCE_FAILED || 
sourceStatus == SourceStatus.SOURCE_DISABLE) {
-                return false;
-            } else {
-                log.warn("stream source={} cannot be operated for status={}", 
streamSource, sourceStatus);
-                TimeUnit.SECONDS.sleep(5);
-                streamSource = streamSourceService.get(streamSource.getId());
-            }
-        }
-        SourceStatus sourceStatus = 
SourceStatus.forCode(streamSource.getStatus());
-        if (sourceStatus != SourceStatus.SOURCE_NORMAL
-                && sourceStatus != SourceStatus.SOURCE_STOP
-                && sourceStatus != SourceStatus.SOURCE_DISABLE
-                && sourceStatus != SourceStatus.SOURCE_FAILED
-                && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
-            log.error("stream source ={} cannot be operated for status={}", 
streamSource, sourceStatus);
-            unOperatedSources.add(streamSource);
-        }
-        return false;
-    }
-
     /**
      * Operate stream sources ,such as delete, stop, restart.
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 2ecd79fb37..7c4c0c4882 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -277,10 +277,6 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         if (curState == SourceStatus.SOURCE_STOP) {
             return;
         }
-        if (!SourceStatus.isAllowedTransition(curState, nextState)) {
-            throw new BusinessException(String.format("current source 
status=%s for id=%s is not allowed to stop",
-                    existEntity.getStatus(), existEntity.getId()));
-        }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, 
StreamSourceEntity::new);
         curEntity.setPreviousStatus(curState.getCode());
         curEntity.setStatus(nextState.getCode());
@@ -300,10 +296,6 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         StreamSourceEntity existEntity = 
sourceMapper.selectByIdForUpdate(request.getId());
         SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
         SourceStatus nextState = SourceStatus.TO_BE_ISSUED_ACTIVE;
-        if (!SourceStatus.isAllowedTransition(curState, nextState)) {
-            throw new BusinessException(String.format("current source 
status=%s for id=%s is not allowed to restart",
-                    existEntity.getStatus(), existEntity.getId()));
-        }
         StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request, 
StreamSourceEntity::new);
         curEntity.setPreviousStatus(curState.getCode());
         curEntity.setStatus(nextState.getCode());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index aab60a8fdb..ca6865bdbb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -343,11 +343,6 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
                 || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
             nextStatus = SourceStatus.SOURCE_DISABLE;
         }
-        if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
-            throw new BusinessException(
-                    String.format("current source status=%s for id=%s is not 
allowed to delete", entity.getStatus(),
-                            entity.getId()));
-        }
 
         entity.setPreviousStatus(curStatus.getCode());
         entity.setStatus(nextStatus.getCode());
@@ -360,7 +355,9 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         sourceFieldMapper.deleteAll(id);
-
+        SourceRequest request = CommonBeanUtils.copyProperties(entity, 
SourceRequest::new, true);
+        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
+        sourceOperator.updateAgentTaskConfig(request, operator);
         LOGGER.info("success to delete source for id={} by user={}", id, 
operator);
         return true;
     }

Reply via email to