This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 45b4f6da8 [INLONG-5799][Manager] Fix the source delete/stop/restart
bug (#5807)
45b4f6da8 is described below
commit 45b4f6da843ffa244bdd67b4c05c8c5818708b3c
Author: woofyzhao <[email protected]>
AuthorDate: Wed Sep 7 13:57:54 2022 +0800
[INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
Co-authored-by: healchow <[email protected]>
---
.../service/listener/source/AbstractSourceOperateListener.java | 5 +++++
.../inlong/manager/service/source/StreamSourceServiceImpl.java | 3 ---
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index 7f1097e18..c07c86905 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -99,6 +99,11 @@ public abstract class AbstractSourceOperateListener
implements SourceOperateList
*/
@SneakyThrows
public boolean checkIfOp(StreamSource streamSource, List<StreamSource>
unOperatedSources) {
+ // if a source has sub-sources, it is considered a template source.
+ // template sources do not need to be operated, its sub-sources will
be processed in this method later.
+ if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
+ return false;
+ }
for (int retry = 0; retry < 60; retry++) {
int status = streamSource.getStatus();
SourceStatus sourceStatus = SourceStatus.forCode(status);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 04a64e7ac..5f785fe2c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -237,7 +237,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(),
operator);
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
@@ -272,7 +271,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
LOGGER.info("begin to restart source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(),
operator);
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();
@@ -290,7 +288,6 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
LOGGER.info("begin to stop source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity,
ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- groupCheckService.checkGroupStatus(entity.getInlongGroupId(),
operator);
StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
SourceRequest sourceRequest = new SourceRequest();