This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f5d80830f8 [INLONG-9036][Manager] Support create tables for dataSync
(#9037)
f5d80830f8 is described below
commit f5d80830f8a4c07fd3acff51d364416112c59403
Author: fuweng11 <[email protected]>
AuthorDate: Tue Oct 10 18:21:37 2023 +0800
[INLONG-9036][Manager] Support create tables for dataSync (#9037)
---
.../service/listener/queue/QueueResourceListener.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 4bab2a146b..b80763d1db 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -95,11 +95,7 @@ public class QueueResourceListener implements
QueueOperateListener {
@Override
public boolean accept(WorkflowContext context) {
- if (!isGroupProcessForm(context)) {
- return false;
- }
- GroupResourceProcessForm processForm = (GroupResourceProcessForm)
context.getProcessForm();
- return
InlongConstants.STANDARD_MODE.equals(processForm.getGroupInfo().getInlongGroupMode());
+ return isGroupProcessForm(context);
}
@Override
@@ -117,14 +113,22 @@ public class QueueResourceListener implements
QueueOperateListener {
groupProcessForm.setGroupInfo(groupInfo);
groupProcessForm.setStreamInfos(streamService.list(groupId));
+ String operator = context.getOperator();
+ GroupOperateType operateType = groupProcessForm.getGroupOperateType();
+ if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ log.warn("skip to execute QueueResourceListener as sync mode for
groupId={}", groupId);
+ if (GroupOperateType.INIT.equals(operateType)) {
+ this.createQueueForStreams(groupInfo,
groupProcessForm.getStreamInfos(), operator);
+ }
+ return ListenerResult.success("skip - disable create mq resource
for sync mode");
+ }
+
if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
{
log.warn("skip to execute QueueResourceListener as disable create
resource for groupId={}", groupId);
return ListenerResult.success("skip - disable create resource");
}
QueueResourceOperator queueOperator =
queueOperatorFactory.getInstance(groupInfo.getMqType());
- GroupOperateType operateType = groupProcessForm.getGroupOperateType();
- String operator = context.getOperator();
switch (operateType) {
case INIT:
groupService.updateStatus(groupId,
GroupStatus.CONFIG_ING.getCode(), operator);