This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 390b4b064 [INLONG-7468][Manager] Fix re-executing the workflow fails
to load the new configuration information (#7469)
390b4b064 is described below
commit 390b4b064de6b31407fe8e5dfab7a456361aed9c
Author: fuweng11 <[email protected]>
AuthorDate: Wed Mar 1 10:03:31 2023 +0800
[INLONG-7468][Manager] Fix re-executing the workflow fails to load the new
configuration information (#7469)
---
.../service/listener/queue/QueueResourceListener.java | 6 ++++++
.../listener/queue/StreamQueueResourceListener.java | 7 +++++++
.../service/listener/sink/SinkResourceListener.java | 5 ++++-
.../service/listener/sort/SortConfigListener.java | 18 ++++++++++++++++--
.../listener/sort/StreamSortConfigListener.java | 11 ++++++++++-
5 files changed, 43 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 152832ba1..8e50884f0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import
org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -77,6 +78,8 @@ public class QueueResourceListener implements
QueueOperateListener {
@Autowired
private InlongGroupService groupService;
@Autowired
+ private InlongStreamService streamService;
+ @Autowired
private WorkflowService workflowService;
@Autowired
private QueueResourceOperatorFactory queueOperatorFactory;
@@ -106,6 +109,9 @@ public class QueueResourceListener implements
QueueOperateListener {
log.error(msg);
throw new WorkflowListenerException(msg);
}
+ // Read the current information
+ groupProcessForm.setGroupInfo(groupInfo);
+ groupProcessForm.setStreamInfos(streamService.list(groupId));
if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
{
log.warn("skip to execute QueueResourceListener as disable create
resource for groupId={}", groupId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
index eb3aaab8c..a9e778a8f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import
org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -45,6 +46,8 @@ public class StreamQueueResourceListener implements
QueueOperateListener {
@Autowired
private InlongGroupService groupService;
@Autowired
+ private InlongStreamService streamService;
+ @Autowired
private QueueResourceOperatorFactory queueOperatorFactory;
@Override
@@ -80,6 +83,10 @@ public class StreamQueueResourceListener implements
QueueOperateListener {
log.error(msg);
throw new WorkflowListenerException(msg);
}
+ final String streamId = streamInfo.getInlongStreamId();
+ // Read the current information
+ streamProcessForm.setGroupInfo(groupInfo);
+ streamProcessForm.setStreamInfo(streamService.get(groupId, streamId));
if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
{
log.warn("skip to execute StreamQueueResourceListener as disable
create resource for groupId={}", groupId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
index 2ceda3f0c..49c07d6a2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import
org.apache.inlong.manager.service.resource.sink.SinkResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
@@ -48,6 +49,8 @@ public class SinkResourceListener implements
SinkOperateListener {
@Autowired
private StreamSinkEntityMapper sinkMapper;
@Autowired
+ private InlongStreamService streamService;
+ @Autowired
private SinkResourceOperatorFactory sinkOperatorFactory;
@Override
@@ -62,7 +65,7 @@ public class SinkResourceListener implements
SinkOperateListener {
log.info("begin to create sink resources for groupId={}", groupId);
List<String> streamIdList = new ArrayList<>();
- List<InlongStreamInfo> streamList = form.getStreamInfos();
+ List<InlongStreamInfo> streamList = streamService.list(groupId);
if (CollectionUtils.isNotEmpty(streamList)) {
streamIdList =
streamList.stream().map(InlongStreamInfo::getInlongStreamId).collect(Collectors.toList());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 39a3b78dc..ee1b24e86 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -26,8 +26,10 @@ import
org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import
org.apache.inlong.manager.service.resource.sort.SortConfigOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -49,6 +51,10 @@ public class SortConfigListener implements
SortOperateListener {
@Autowired
private SortConfigOperatorFactory operatorFactory;
+ @Autowired
+ private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
@Override
public TaskEvent event() {
@@ -80,8 +86,16 @@ public class SortConfigListener implements
SortOperateListener {
LOGGER.info("no need to build sort config for groupId={} as the
operate type is {}", groupId, operateType);
return ListenerResult.success();
}
-
- InlongGroupInfo groupInfo = form.getGroupInfo();
+ // ensure the inlong group exists
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ if (groupInfo == null) {
+ String msg = "inlong group not found with groupId=" + groupId;
+ LOGGER.error(msg);
+ throw new WorkflowListenerException(msg);
+ }
+ // Read the current information
+ form.setGroupInfo(groupInfo);
+ form.setStreamInfos(streamService.list(groupId));
List<InlongStreamInfo> streamInfos = form.getStreamInfos();
if (CollectionUtils.isEmpty(streamInfos)) {
LOGGER.warn("no need to build sort config for groupId={}, as not
found any stream", groupId);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 299d65eef..c2e943d03 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -26,8 +26,10 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
import
org.apache.inlong.manager.service.resource.sort.SortConfigOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +52,10 @@ public class StreamSortConfigListener implements
SortOperateListener {
@Autowired
private SortConfigOperatorFactory operatorFactory;
+ @Autowired
+ private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
@Override
public TaskEvent event() {
@@ -85,12 +91,15 @@ public class StreamSortConfigListener implements
SortOperateListener {
return ListenerResult.success();
}
- InlongGroupInfo groupInfo = form.getGroupInfo();
+ InlongGroupInfo groupInfo = groupService.get(groupId);
List<StreamSink> streamSinks = streamInfo.getSinkList();
if (CollectionUtils.isEmpty(streamSinks)) {
LOGGER.warn("not build sort config for groupId={}, streamId={}, as
not found any sinks", groupId, streamId);
return ListenerResult.success();
}
+ // Read the current information
+ form.setGroupInfo(groupInfo);
+ form.setStreamInfo(streamService.get(groupId, streamId));
List<InlongStreamInfo> streamInfos =
Collections.singletonList(streamInfo);
try {