This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f5950eed5a [INLONG-9921][Manager] Fix the problem of manager can't
stop data sync job (#9942)
f5950eed5a is described below
commit f5950eed5a4f1a0479d2a6e1547b763c83d5c9bf
Author: fuweng11 <[email protected]>
AuthorDate: Tue Apr 9 17:15:32 2024 +0800
[INLONG-9921][Manager] Fix the problem of manager can't stop data sync job
(#9942)
* [INLONG-9921][Manager] Fix the problem of manager can't stop data sync job
---
.../inlong/manager/common/enums/GroupStatus.java | 21 +++++++++++----------
.../manager/common/enums/SimpleGroupStatus.java | 4 ++--
.../inlong/manager/common/enums/StreamStatus.java | 15 +++++++--------
.../manager/service/core/impl/AgentServiceImpl.java | 2 +-
.../service/group/InlongGroupProcessService.java | 2 +-
.../listener/group/UpdateGroupCompleteListener.java | 16 +++++++++++++++-
.../stream/UpdateStreamCompleteListener.java | 4 ++--
.../listener/stream/UpdateStreamListener.java | 4 ++--
.../service/stream/InlongStreamProcessService.java | 8 ++++----
.../manager/service/core/impl/AgentServiceTest.java | 8 +++++---
.../group/InlongGroupProcessServiceTest.java | 2 +-
.../source/listener/StreamSourceListenerTest.java | 2 +-
12 files changed, 52 insertions(+), 36 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index 79e89ad53b..759f5a03b4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -41,13 +41,13 @@ public enum GroupStatus {
CONFIG_FAILED(120, "configuration failed"),
CONFIG_SUCCESSFUL(130, "configuration successful"),
- CONFIG_OFFLINE_ING(141, "in configure offline"),
- CONFIGURATION_OFFLINE(140, "configure offline successful"),
+ CONFIG_OFFLINE_ING(141, "configuration is going offline"),
+ CONFIG_OFFLINE_SUCCESSFUL(140, "configuration offline successful"),
- CONFIG_ONLINE_ING(151, "in configure online"),
+ CONFIG_ONLINE_ING(151, "configuration is going online"),
- CONFIG_DELETING(41, "configure deleting"),
- CONFIG_DELETED(40, "configure deleted"),
+ CONFIG_DELETING(41, "configuration deleting"),
+ CONFIG_DELETED(40, "configuration deleted"),
// FINISH is used for batch task.
FINISH(131, "finish");
@@ -71,9 +71,10 @@ public enum GroupStatus {
Sets.newHashSet(CONFIG_SUCCESSFUL, TO_BE_APPROVAL, CONFIG_ING,
CONFIG_OFFLINE_ING, CONFIG_DELETING));
GROUP_STATE_AUTOMATON.put(
- CONFIG_OFFLINE_ING, Sets.newHashSet(CONFIG_OFFLINE_ING,
CONFIGURATION_OFFLINE, CONFIG_FAILED));
- GROUP_STATE_AUTOMATON.put(CONFIGURATION_OFFLINE,
Sets.newHashSet(CONFIGURATION_OFFLINE, CONFIG_ONLINE_ING,
- CONFIG_DELETING));
+ CONFIG_OFFLINE_ING, Sets.newHashSet(CONFIG_OFFLINE_ING,
CONFIG_OFFLINE_SUCCESSFUL, CONFIG_FAILED));
+ GROUP_STATE_AUTOMATON.put(
+ CONFIG_OFFLINE_SUCCESSFUL,
Sets.newHashSet(CONFIG_OFFLINE_SUCCESSFUL, CONFIG_ONLINE_ING,
+ CONFIG_DELETING));
GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING,
Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED,
CONFIG_SUCCESSFUL));
@@ -143,7 +144,7 @@ public enum GroupStatus {
return status == GroupStatus.APPROVE_PASSED
|| status == GroupStatus.CONFIG_FAILED
|| status == GroupStatus.CONFIG_SUCCESSFUL
- || status == GroupStatus.CONFIGURATION_OFFLINE
+ || status == GroupStatus.CONFIG_OFFLINE_SUCCESSFUL
|| status == GroupStatus.FINISH;
}
@@ -163,7 +164,7 @@ public enum GroupStatus {
*/
public static boolean allowedSuspend(GroupStatus status) {
return status == GroupStatus.CONFIG_SUCCESSFUL
- || status == GroupStatus.CONFIGURATION_OFFLINE
+ || status == GroupStatus.CONFIG_OFFLINE_SUCCESSFUL
|| status == GroupStatus.FINISH;
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
index d9f6bdc98d..879902f9d4 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
@@ -52,7 +52,7 @@ public enum SimpleGroupStatus {
return FAILED;
case CONFIG_SUCCESSFUL:
return STARTED;
- case CONFIGURATION_OFFLINE:
+ case CONFIG_OFFLINE_SUCCESSFUL:
return STOPPED;
case FINISH:
return FINISHED;
@@ -101,7 +101,7 @@ public enum SimpleGroupStatus {
statusList.add(GroupStatus.CONFIG_SUCCESSFUL.getCode());
return statusList;
case STOPPED:
- statusList.add(GroupStatus.CONFIGURATION_OFFLINE.getCode());
+
statusList.add(GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode());
return statusList;
case FINISHED:
statusList.add(GroupStatus.FINISH.getCode());
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
index 383563db34..dad880e37c 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
@@ -27,11 +27,10 @@ public enum StreamStatus {
CONFIG_FAILED(120, "configuration failed"),
CONFIG_SUCCESSFUL(130, "configuration successful"),
- SUSPENDING(141, "suspending"),
- SUSPENDED(140, "suspended"),
+ CONFIG_OFFLINE_ING(141, "configuration is going offline"),
+ CONFIG_OFFLINE_SUCCESSFUL(140, "configuration offline successful"),
- RESTARTING(151, "restarting"),
- RESTARTED(150, "restarted"),
+ CONFIG_ONLINE_ING(151, "configuration is going online"),
DELETING(41, "deleting"),
DELETED(40, "deleted");
@@ -48,8 +47,8 @@ public enum StreamStatus {
* Checks whether the given status allows updating operate.
*/
public static boolean notAllowedUpdate(StreamStatus status) {
- return status == StreamStatus.CONFIG_ING || status ==
StreamStatus.SUSPENDING
- || status == StreamStatus.RESTARTING || status ==
StreamStatus.DELETING;
+ return status == StreamStatus.CONFIG_ING || status ==
StreamStatus.CONFIG_OFFLINE_ING
+ || status == StreamStatus.CONFIG_ONLINE_ING || status ==
StreamStatus.DELETING;
}
/**
@@ -57,8 +56,8 @@ public enum StreamStatus {
*/
public static boolean notAllowedDelete(StreamStatus status) {
return status == StreamStatus.CONFIG_ING
- || status == StreamStatus.RESTARTING
- || status == StreamStatus.SUSPENDING;
+ || status == StreamStatus.CONFIG_ONLINE_ING
+ || status == StreamStatus.CONFIG_OFFLINE_ING;
}
public static StreamStatus forCode(int code) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 682f130747..8d881d81d1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -525,7 +525,7 @@ public class AgentServiceImpl implements AgentService {
List<StreamSourceEntity> sourceEntities =
sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
Lists.newArrayList(SourceType.FILE), agentClusterName);
Set<GroupStatus> noNeedAddTask = Sets.newHashSet(
- GroupStatus.CONFIGURATION_OFFLINE,
GroupStatus.CONFIG_OFFLINE_ING, GroupStatus.CONFIG_DELETING,
+ GroupStatus.CONFIG_OFFLINE_SUCCESSFUL,
GroupStatus.CONFIG_OFFLINE_ING, GroupStatus.CONFIG_DELETING,
GroupStatus.CONFIG_DELETED);
sourceEntities.stream()
.forEach(sourceEntity -> {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index a1460e4ed4..dbb78584b7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -336,7 +336,7 @@ public class InlongGroupProcessService {
case CONFIG_ONLINE_ING:
return GroupStatus.CONFIG_SUCCESSFUL;
case CONFIG_OFFLINE_ING:
- return GroupStatus.CONFIGURATION_OFFLINE;
+ return GroupStatus.CONFIG_OFFLINE_SUCCESSFUL;
default:
return GroupStatus.CONFIG_DELETED;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index d21ca83c92..445ee647ef 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -22,19 +22,25 @@ import
org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* The listener of InlongGroup when update operates successfully.
*/
@@ -46,6 +52,8 @@ public class UpdateGroupCompleteListener implements
ProcessEventListener {
private InlongGroupService groupService;
@Autowired
private StreamSourceService sourceService;
+ @Autowired
+ private InlongStreamService streamService;
@Override
public ProcessEvent event() {
@@ -63,12 +71,18 @@ public class UpdateGroupCompleteListener implements
ProcessEventListener {
InlongGroupInfo groupInfo = form.getGroupInfo();
InlongGroupRequest groupRequest = groupInfo.genRequest();
String operator = context.getOperator();
+ List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+ if (CollectionUtils.isNotEmpty(streamInfos)) {
+ streamInfos.forEach(streamInfo ->
streamService.updateWithoutCheck(streamInfo.genRequest(), operator));
+ }
switch (operateType) {
case SUSPEND:
- groupService.updateStatus(groupId,
GroupStatus.CONFIGURATION_OFFLINE.getCode(), operator);
+ streamService.updateStatus(groupId, null,
StreamStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), operator);
+ groupService.updateStatus(groupId,
GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), operator);
groupService.update(groupRequest, operator);
break;
case RESTART:
+ streamService.updateStatus(groupId, null,
StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
groupService.updateStatus(groupId,
GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
groupService.update(groupRequest, operator);
break;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java
index 8d07861374..896feaefb0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java
@@ -57,10 +57,10 @@ public class UpdateStreamCompleteListener implements
ProcessEventListener {
StreamStatus status;
switch (operateType) {
case RESTART:
- status = StreamStatus.RESTARTED;
+ status = StreamStatus.CONFIG_SUCCESSFUL;
break;
case SUSPEND:
- status = StreamStatus.SUSPENDED;
+ status = StreamStatus.CONFIG_OFFLINE_SUCCESSFUL;
break;
case DELETE:
status = StreamStatus.DELETED;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java
index f5235a3d1b..fd7faaf527 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java
@@ -54,10 +54,10 @@ public class UpdateStreamListener implements
ProcessEventListener {
final String streamId = streamInfo.getInlongStreamId();
switch (operateType) {
case SUSPEND:
- streamService.updateStatus(groupId, streamId,
StreamStatus.SUSPENDING.getCode(), operator);
+ streamService.updateStatus(groupId, streamId,
StreamStatus.CONFIG_OFFLINE_ING.getCode(), operator);
break;
case RESTART:
- streamService.updateStatus(groupId, streamId,
StreamStatus.RESTARTING.getCode(), operator);
+ streamService.updateStatus(groupId, streamId,
StreamStatus.CONFIG_ONLINE_ING.getCode(), operator);
break;
case DELETE:
streamService.updateStatus(groupId, streamId,
StreamStatus.DELETING.getCode(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index b37fd7b5b6..59a1390111 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -134,12 +134,12 @@ public class InlongStreamProcessService {
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
Preconditions.expectNotNull(streamInfo,
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
- if (status == StreamStatus.SUSPENDED || status ==
StreamStatus.SUSPENDING) {
+ if (status == StreamStatus.CONFIG_OFFLINE_SUCCESSFUL || status ==
StreamStatus.CONFIG_OFFLINE_ING) {
log.warn("groupId={}, streamId={} is already in {}", groupId,
streamId, status);
return true;
}
- if (status != StreamStatus.CONFIG_SUCCESSFUL && status !=
StreamStatus.RESTARTED) {
+ if (status != StreamStatus.CONFIG_SUCCESSFUL) {
throw new BusinessException(String.format("stream status=%s not
support suspend stream"
+ " for groupId=%s streamId=%s", status, groupId,
streamId));
}
@@ -176,12 +176,12 @@ public class InlongStreamProcessService {
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
Preconditions.expectNotNull(streamInfo,
ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
- if (status == StreamStatus.RESTARTED || status ==
StreamStatus.RESTARTING) {
+ if (status == StreamStatus.CONFIG_ONLINE_ING) {
log.warn("inlong stream was already in {} for groupId={},
streamId={}", status, groupId, streamId);
return true;
}
- if (status != StreamStatus.SUSPENDED) {
+ if (status != StreamStatus.CONFIG_OFFLINE_SUCCESSFUL) {
throw new BusinessException(String.format("stream status=%s not
support restart stream"
+ " for groupId=%s streamId=%s", status, groupId,
streamId));
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 41acbdfd86..9b267eaa3b 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -143,8 +143,9 @@ class AgentServiceTest extends ServiceBaseTest {
sources.stream()
.filter(source -> source.getTaskMapId() != null)
.forEach(source -> sourceService.stop(source.getId(),
GLOBAL_OPERATOR));
- groupMapper.updateStatus(groupId,
GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR);
- streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR);
+ groupMapper.updateStatus(groupId,
GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
+ streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(),
+ GLOBAL_OPERATOR);
}
/**
@@ -156,7 +157,8 @@ class AgentServiceTest extends ServiceBaseTest {
.filter(source -> source.getTaskMapId() != null)
.forEach(source -> sourceService.restart(source.getId(),
GLOBAL_OPERATOR));
groupMapper.updateStatus(groupId,
GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
- streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.RESTARTED.getCode(), GLOBAL_OPERATOR);
+ streamMapper.updateStatusByIdentifier(groupId, streamId,
StreamStatus.CONFIG_SUCCESSFUL.getCode(),
+ GLOBAL_OPERATOR);
}
public void deleteSource(String groupId, String streamId) {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java
index 96bbef6a7a..5087849289 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java
@@ -96,7 +96,7 @@ public class InlongGroupProcessServiceTest extends
ServiceBaseTest {
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
- Assertions.assertEquals(groupInfo.getStatus(),
GroupStatus.CONFIGURATION_OFFLINE.getCode());
+ Assertions.assertEquals(groupInfo.getStatus(),
GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode());
}
private void testRestartProcess() {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
index 42df6bfc65..99e1420915 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
@@ -103,7 +103,7 @@ public class StreamSourceListenerTest extends
ServiceBaseTest {
private void testRestartSource(Integer sourceId) {
groupService.updateStatus(GROUP_ID,
GroupStatus.CONFIG_OFFLINE_ING.getCode(), GLOBAL_OPERATOR);
groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
- groupService.updateStatus(GROUP_ID,
GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR);
+ groupService.updateStatus(GROUP_ID,
GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
sourceService.updateStatus(GROUP_ID, null,
SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);