This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 390b4b064 [INLONG-7468][Manager] Fix re-executing the workflow fails 
to load the new configuration information (#7469)
390b4b064 is described below

commit 390b4b064de6b31407fe8e5dfab7a456361aed9c
Author: fuweng11 <[email protected]>
AuthorDate: Wed Mar 1 10:03:31 2023 +0800

    [INLONG-7468][Manager] Fix re-executing the workflow fails to load the new 
configuration information (#7469)
---
 .../service/listener/queue/QueueResourceListener.java  |  6 ++++++
 .../listener/queue/StreamQueueResourceListener.java    |  7 +++++++
 .../service/listener/sink/SinkResourceListener.java    |  5 ++++-
 .../service/listener/sort/SortConfigListener.java      | 18 ++++++++++++++++--
 .../listener/sort/StreamSortConfigListener.java        | 11 ++++++++++-
 5 files changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 152832ba1..8e50884f0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import 
org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -77,6 +78,8 @@ public class QueueResourceListener implements 
QueueOperateListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private WorkflowService workflowService;
     @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
@@ -106,6 +109,9 @@ public class QueueResourceListener implements 
QueueOperateListener {
             log.error(msg);
             throw new WorkflowListenerException(msg);
         }
+        // Read the current information
+        groupProcessForm.setGroupInfo(groupInfo);
+        groupProcessForm.setStreamInfos(streamService.list(groupId));
 
         if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
             log.warn("skip to execute QueueResourceListener as disable create 
resource for groupId={}", groupId);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
index eb3aaab8c..a9e778a8f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/StreamQueueResourceListener.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProces
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import 
org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -45,6 +46,8 @@ public class StreamQueueResourceListener implements 
QueueOperateListener {
     @Autowired
     private InlongGroupService groupService;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private QueueResourceOperatorFactory queueOperatorFactory;
 
     @Override
@@ -80,6 +83,10 @@ public class StreamQueueResourceListener implements 
QueueOperateListener {
             log.error(msg);
             throw new WorkflowListenerException(msg);
         }
+        final String streamId = streamInfo.getInlongStreamId();
+        // Read the current information
+        streamProcessForm.setGroupInfo(groupInfo);
+        streamProcessForm.setStreamInfo(streamService.get(groupId, streamId));
 
         if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
             log.warn("skip to execute StreamQueueResourceListener as disable 
create resource for groupId={}", groupId);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
index 2ceda3f0c..49c07d6a2 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sink/SinkResourceListener.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import 
org.apache.inlong.manager.service.resource.sink.SinkResourceOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
@@ -48,6 +49,8 @@ public class SinkResourceListener implements 
SinkOperateListener {
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private SinkResourceOperatorFactory sinkOperatorFactory;
 
     @Override
@@ -62,7 +65,7 @@ public class SinkResourceListener implements 
SinkOperateListener {
         log.info("begin to create sink resources for groupId={}", groupId);
 
         List<String> streamIdList = new ArrayList<>();
-        List<InlongStreamInfo> streamList = form.getStreamInfos();
+        List<InlongStreamInfo> streamList = streamService.list(groupId);
         if (CollectionUtils.isNotEmpty(streamList)) {
             streamIdList = 
streamList.stream().map(InlongStreamInfo::getInlongStreamId).collect(Collectors.toList());
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 39a3b78dc..ee1b24e86 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -26,8 +26,10 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
 import 
org.apache.inlong.manager.service.resource.sort.SortConfigOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -49,6 +51,10 @@ public class SortConfigListener implements 
SortOperateListener {
 
     @Autowired
     private SortConfigOperatorFactory operatorFactory;
+    @Autowired
+    private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
 
     @Override
     public TaskEvent event() {
@@ -80,8 +86,16 @@ public class SortConfigListener implements 
SortOperateListener {
             LOGGER.info("no need to build sort config for groupId={} as the 
operate type is {}", groupId, operateType);
             return ListenerResult.success();
         }
-
-        InlongGroupInfo groupInfo = form.getGroupInfo();
+        // ensure the inlong group exists
+        InlongGroupInfo groupInfo = groupService.get(groupId);
+        if (groupInfo == null) {
+            String msg = "inlong group not found with groupId=" + groupId;
+            LOGGER.error(msg);
+            throw new WorkflowListenerException(msg);
+        }
+        // Read the current information
+        form.setGroupInfo(groupInfo);
+        form.setStreamInfos(streamService.list(groupId));
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
         if (CollectionUtils.isEmpty(streamInfos)) {
             LOGGER.warn("no need to build sort config for groupId={}, as not 
found any stream", groupId);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 299d65eef..c2e943d03 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -26,8 +26,10 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
 import 
org.apache.inlong.manager.service.resource.sort.SortConfigOperatorFactory;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -50,6 +52,10 @@ public class StreamSortConfigListener implements 
SortOperateListener {
 
     @Autowired
     private SortConfigOperatorFactory operatorFactory;
+    @Autowired
+    private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
 
     @Override
     public TaskEvent event() {
@@ -85,12 +91,15 @@ public class StreamSortConfigListener implements 
SortOperateListener {
             return ListenerResult.success();
         }
 
-        InlongGroupInfo groupInfo = form.getGroupInfo();
+        InlongGroupInfo groupInfo = groupService.get(groupId);
         List<StreamSink> streamSinks = streamInfo.getSinkList();
         if (CollectionUtils.isEmpty(streamSinks)) {
             LOGGER.warn("not build sort config for groupId={}, streamId={}, as 
not found any sinks", groupId, streamId);
             return ListenerResult.success();
         }
+        // Read the current information
+        form.setGroupInfo(groupInfo);
+        form.setStreamInfo(streamService.get(groupId, streamId));
 
         List<InlongStreamInfo> streamInfos = 
Collections.singletonList(streamInfo);
         try {

Reply via email to