This is an automated email from the ASF dual-hosted git repository.
wakefu pushed a commit to branch branch-1.13
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.13 by this push:
new ecfae06b9c [INLONG-10714][Manager] Fix the problem of incorrect
deletion of data source task (#10715)
ecfae06b9c is described below
commit ecfae06b9ce1d54c85c685ea30eda20c35c855c1
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jul 24 19:31:15 2024 +0800
[INLONG-10714][Manager] Fix the problem of incorrect deletion of data
source task (#10715)
* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data
source task
* [INLONG-10714][Manager] Fix the problem of incorrect deletion of data
source task
---
.../source/AbstractSourceOperateListener.java | 60 ++--------------------
.../service/source/AbstractSourceOperator.java | 8 ---
.../service/source/StreamSourceServiceImpl.java | 9 ++--
3 files changed, 7 insertions(+), 70 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index ccefd5cf10..6b19648b11 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -18,9 +18,7 @@
package org.apache.inlong.manager.service.listener.source;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -33,15 +31,11 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
-import java.util.concurrent.TimeUnit;
/**
* Event listener of operate resources, such as delete, stop, restart sources.
@@ -67,67 +61,21 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
InlongGroupInfo groupInfo = getGroupInfo(context.getProcessForm());
final String groupId = groupInfo.getInlongGroupId();
List<InlongStreamBriefInfo> streamResponses =
streamService.listBriefWithSink(groupId);
- List<StreamSource> unOperatedSources = Lists.newArrayList();
- streamResponses.forEach(stream -> operateStreamSources(groupId,
stream.getInlongStreamId(),
- context.getOperator(), unOperatedSources));
-
- if (CollectionUtils.isNotEmpty(unOperatedSources)) {
- GroupOperateType operateType =
getOperateType(context.getProcessForm());
- StringBuilder builder = new StringBuilder("Unsupported operate
").append(operateType).append(" for (");
- unOperatedSources.forEach(source -> builder.append("
").append(source.getSourceName()).append(" "));
- String errMsg = builder.append(")").toString();
- throw new WorkflowListenerException(errMsg);
- }
-
+ streamResponses
+ .forEach(stream -> operateStreamSources(groupId,
stream.getInlongStreamId(), context.getOperator()));
return ListenerResult.success();
}
/**
* Operate stream sources, such as delete, stop, restart.
*/
- protected void operateStreamSources(String groupId, String streamId,
String operator,
- List<StreamSource> unOperatedSources) {
+ protected void operateStreamSources(String groupId, String streamId,
String operator) {
List<StreamSource> sources = streamSourceService.listSource(groupId,
streamId);
sources.forEach(source -> {
- if (checkIfOp(source, unOperatedSources)) {
- operateStreamSource(source.genSourceRequest(), operator);
- }
+ operateStreamSource(source.genSourceRequest(), operator);
});
}
- /**
- * Check source status.
- */
- @SneakyThrows
- public boolean checkIfOp(StreamSource streamSource, List<StreamSource>
unOperatedSources) {
- for (int retry = 0; retry < 60; retry++) {
- int status = streamSource.getStatus();
- SourceStatus sourceStatus = SourceStatus.forCode(status);
- // template sources are filtered and processed in corresponding
subclass listeners
- if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus ==
SourceStatus.SOURCE_STOP
- || sourceStatus == SourceStatus.HEARTBEAT_TIMEOUT
- ||
CollectionUtils.isNotEmpty(streamSource.getDataAddTaskList())) {
- return true;
- } else if (sourceStatus == SourceStatus.SOURCE_FAILED ||
sourceStatus == SourceStatus.SOURCE_DISABLE) {
- return false;
- } else {
- log.warn("stream source={} cannot be operated for status={}",
streamSource, sourceStatus);
- TimeUnit.SECONDS.sleep(5);
- streamSource = streamSourceService.get(streamSource.getId());
- }
- }
- SourceStatus sourceStatus =
SourceStatus.forCode(streamSource.getStatus());
- if (sourceStatus != SourceStatus.SOURCE_NORMAL
- && sourceStatus != SourceStatus.SOURCE_STOP
- && sourceStatus != SourceStatus.SOURCE_DISABLE
- && sourceStatus != SourceStatus.SOURCE_FAILED
- && sourceStatus != SourceStatus.HEARTBEAT_TIMEOUT) {
- log.error("stream source ={} cannot be operated for status={}",
streamSource, sourceStatus);
- unOperatedSources.add(streamSource);
- }
- return false;
- }
-
/**
* Operate stream sources ,such as delete, stop, restart.
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 2ecd79fb37..7c4c0c4882 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -277,10 +277,6 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
if (curState == SourceStatus.SOURCE_STOP) {
return;
}
- if (!SourceStatus.isAllowedTransition(curState, nextState)) {
- throw new BusinessException(String.format("current source
status=%s for id=%s is not allowed to stop",
- existEntity.getStatus(), existEntity.getId()));
- }
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
curEntity.setPreviousStatus(curState.getCode());
curEntity.setStatus(nextState.getCode());
@@ -300,10 +296,6 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
StreamSourceEntity existEntity =
sourceMapper.selectByIdForUpdate(request.getId());
SourceStatus curState = SourceStatus.forCode(existEntity.getStatus());
SourceStatus nextState = SourceStatus.TO_BE_ISSUED_ACTIVE;
- if (!SourceStatus.isAllowedTransition(curState, nextState)) {
- throw new BusinessException(String.format("current source
status=%s for id=%s is not allowed to restart",
- existEntity.getStatus(), existEntity.getId()));
- }
StreamSourceEntity curEntity = CommonBeanUtils.copyProperties(request,
StreamSourceEntity::new);
curEntity.setPreviousStatus(curState.getCode());
curEntity.setStatus(nextState.getCode());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index aab60a8fdb..ca6865bdbb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -343,11 +343,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
|| SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
nextStatus = SourceStatus.SOURCE_DISABLE;
}
- if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {
- throw new BusinessException(
- String.format("current source status=%s for id=%s is not
allowed to delete", entity.getStatus(),
- entity.getId()));
- }
entity.setPreviousStatus(curStatus.getCode());
entity.setStatus(nextStatus.getCode());
@@ -360,7 +355,9 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
sourceFieldMapper.deleteAll(id);
-
+ SourceRequest request = CommonBeanUtils.copyProperties(entity,
SourceRequest::new, true);
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(request.getSourceType());
+ sourceOperator.updateAgentTaskConfig(request, operator);
LOGGER.info("success to delete source for id={} by user={}", id,
operator);
return true;
}