This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d80830f8 [INLONG-9036][Manager] Support create tables for dataSync 
(#9037)
f5d80830f8 is described below

commit f5d80830f8a4c07fd3acff51d364416112c59403
Author: fuweng11 <[email protected]>
AuthorDate: Tue Oct 10 18:21:37 2023 +0800

    [INLONG-9036][Manager] Support create tables for dataSync (#9037)
---
 .../service/listener/queue/QueueResourceListener.java  | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 4bab2a146b..b80763d1db 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -95,11 +95,7 @@ public class QueueResourceListener implements 
QueueOperateListener {
 
     @Override
     public boolean accept(WorkflowContext context) {
-        if (!isGroupProcessForm(context)) {
-            return false;
-        }
-        GroupResourceProcessForm processForm = (GroupResourceProcessForm) 
context.getProcessForm();
-        return 
InlongConstants.STANDARD_MODE.equals(processForm.getGroupInfo().getInlongGroupMode());
+        return isGroupProcessForm(context);
     }
 
     @Override
@@ -117,14 +113,22 @@ public class QueueResourceListener implements 
QueueOperateListener {
         groupProcessForm.setGroupInfo(groupInfo);
         groupProcessForm.setStreamInfos(streamService.list(groupId));
 
+        String operator = context.getOperator();
+        GroupOperateType operateType = groupProcessForm.getGroupOperateType();
+        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+            log.warn("skip to execute QueueResourceListener as sync mode for 
groupId={}", groupId);
+            if (GroupOperateType.INIT.equals(operateType)) {
+                this.createQueueForStreams(groupInfo, 
groupProcessForm.getStreamInfos(), operator);
+            }
+            return ListenerResult.success("skip - disable create mq resource 
for sync mode");
+        }
+
         if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
             log.warn("skip to execute QueueResourceListener as disable create 
resource for groupId={}", groupId);
             return ListenerResult.success("skip - disable create resource");
         }
 
         QueueResourceOperator queueOperator = 
queueOperatorFactory.getInstance(groupInfo.getMqType());
-        GroupOperateType operateType = groupProcessForm.getGroupOperateType();
-        String operator = context.getOperator();
         switch (operateType) {
             case INIT:
                 groupService.updateStatus(groupId, 
GroupStatus.CONFIG_ING.getCode(), operator);

Reply via email to