This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6802fc969 [INLONG-4811][Manager] Add delete MQ task in group and 
stream workflow definition (#4813)
6802fc969 is described below

commit 6802fc969fe5cd3bfa66ded27aec85c775d2ef08
Author: kipshi <[email protected]>
AuthorDate: Thu Jun 30 16:59:19 2022 +0800

    [INLONG-4811][Manager] Add delete MQ task in group and stream workflow 
definition (#4813)
---
 .../service/mq/CreateTubeTopicTaskListener.java    |   3 -
 .../mq/DeletePulsarResourceTaskListener.java       | 121 +++++++++++++++++++++
 .../service/mq/DeletePulsarTopicTaskListener.java  | 102 +++++++++++++++++
 ...ctor.java => PulsarResourceCreateSelector.java} |  10 +-
 ...ctor.java => PulsarResourceDeleteSelector.java} |  17 +--
 ...elector.java => PulsarTopicCreateSelector.java} |  31 ++++--
 ...elector.java => PulsarTopicDeleteSelector.java} |  35 +++---
 .../manager/service/mq/util/PulsarOperator.java    |  48 +++++++-
 .../group/DeleteGroupWorkflowDefinition.java       |  11 +-
 .../listener/GroupTaskListenerFactory.java         |  12 +-
 .../listener/StreamTaskListenerFactory.java        |  12 +-
 .../stream/DeleteStreamWorkflowDefinition.java     |  15 ++-
 .../workflow/definition/ServiceTaskType.java       |   1 +
 13 files changed, 368 insertions(+), 50 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index 50ace8a4a..554a8d298 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -25,7 +25,6 @@ import 
org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -45,8 +44,6 @@ public class CreateTubeTopicTaskListener implements 
QueueOperateListener {
     private InlongClusterService clusterService;
     @Autowired
     private TubeMQOperator tubeMQOperator;
-    @Autowired
-    private InlongGroupService groupService;
 
     @Override
     public TaskEvent event() {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
new file mode 100644
index 000000000..3699cb1dd
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
+import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
+import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * Delete Pulsar tenant, namespace and topic after group is operated to delete
+ */
+@Slf4j
+@Component
+public class DeletePulsarResourceTaskListener implements QueueOperateListener {
+
+    @Autowired
+    private InlongGroupService groupService;
+    @Autowired
+    private InlongStreamService streamService;
+    @Autowired
+    private InlongClusterService clusterService;
+    @Autowired
+    private PulsarOperator pulsarOperator;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws Exception {
+        GroupResourceProcessForm form = (GroupResourceProcessForm) 
context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        log.info("begin to delete pulsar resource for groupId={}", groupId);
+        InlongGroupInfo groupInfo = groupService.get(groupId);
+        if (groupInfo == null) {
+            throw new WorkflowListenerException("inlong group or pulsar 
cluster not found for groupId=" + groupId);
+        }
+        try {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            InlongClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+                    ClusterType.CLS_PULSAR);
+            deletePulsarProcess(pulsarInfo, (PulsarClusterInfo) clusterInfo);
+        } catch (Exception e) {
+            log.error("delete pulsar resource error for groupId={}", groupId, 
e);
+            throw new WorkflowListenerException("delete pulsar resource error 
for groupId=" + groupId);
+        }
+
+        log.info("success to delete pulsar resource for groupId={}", groupId);
+        return ListenerResult.success();
+    }
+
+    /**
+     * Delete all Pulsar topics forcefully
+     */
+    private void deletePulsarProcess(InlongPulsarInfo pulsarInfo, 
PulsarClusterInfo pulsarCluster) throws Exception {
+        String groupId = pulsarInfo.getInlongGroupId();
+        log.info("begin to delete pulsar resource for groupId={} in 
cluster={}", groupId, pulsarCluster);
+        final String tenant = pulsarCluster.getTenant();
+        final String namespace = pulsarInfo.getMqResource();
+        Preconditions.checkNotNull(namespace, "pulsar namespace cannot be 
empty for groupId=" + groupId);
+        try (PulsarAdmin pulsarAdmin = 
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+            List<InlongStreamBriefInfo> streamTopicList = 
streamService.getTopicList(groupId);
+            for (InlongStreamBriefInfo streamInfo : streamTopicList) {
+                final String topic = streamInfo.getMqResource();
+                if (topic == null) {
+                    continue;
+                }
+                PulsarTopicBean topicBean = PulsarTopicBean.builder()
+                        .tenant(tenant)
+                        .namespace(namespace)
+                        .topicName(topic)
+                        .build();
+                pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+            }
+        }
+        log.info("finish to delete pulsar resource for groupId={}, 
cluster={}", groupId, pulsarCluster);
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
new file mode 100644
index 000000000..b7c279b19
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import 
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
+import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Delete pulsar topic after stream is operated to delete.
+ */
+@Slf4j
+@Component
+public class DeletePulsarTopicTaskListener implements QueueOperateListener {
+
+    @Autowired
+    private InlongClusterService clusterService;
+    @Autowired
+    private PulsarOperator pulsarOperator;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws Exception {
+        StreamResourceProcessForm form = (StreamResourceProcessForm) 
context.getProcessForm();
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        InlongStreamInfo streamInfo = form.getStreamInfo();
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        log.info("begin to delete pulsar topic for groupId={}, streamId={}", 
groupId, streamId);
+
+        try {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            String pulsarTopic = streamInfo.getMqResource();
+            String clusterTag = pulsarInfo.getInlongClusterTag();
+            InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, 
null, ClusterType.CLS_PULSAR);
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            }
+
+            try (PulsarAdmin pulsarAdmin = 
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+                PulsarTopicBean topicBean = PulsarTopicBean.builder()
+                        .tenant(tenant)
+                        .namespace(pulsarInfo.getMqResource())
+                        .topicName(pulsarTopic)
+                        .build();
+                pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+            }
+        } catch (Exception e) {
+            String msg = String.format("failed to delete pulsar topic for 
groupId=%s, streamId=%s", groupId, streamId);
+            log.error(msg, e);
+            throw new WorkflowListenerException(msg);
+        }
+
+        log.info("success to delete pulsar topic for groupId={}, streamId={}", 
groupId, streamId);
+        return ListenerResult.success();
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
similarity index 88%
copy from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
copy to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
index d9f33b2a9..df5f035ae 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of pulsar event for create pulsar resource.
+ * Selector of pulsar event for creating pulsar resource.
  */
 @Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarResourceCreateSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
@@ -39,8 +40,11 @@ public class PulsarEventSelector implements EventSelector {
         if (!(processForm instanceof GroupResourceProcessForm)) {
             return false;
         }
-
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        GroupOperateType operateType = form.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
         String groupId = form.getInlongGroupId();
         InlongGroupInfo groupInfo = form.getGroupInfo();
         MQType mqType = MQType.forType(groupInfo.getMqType());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
similarity index 82%
copy from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
copy to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
index d9f33b2a9..9d9e83625 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 /**
- * Selector of pulsar event for create pulsar resource.
+ * Selector of pulsar event for deleting pulsar resource
  */
 @Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarResourceDeleteSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
@@ -39,8 +40,11 @@ public class PulsarEventSelector implements EventSelector {
         if (!(processForm instanceof GroupResourceProcessForm)) {
             return false;
         }
-
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+        GroupOperateType operateType = form.getGroupOperateType();
+        if (operateType != GroupOperateType.DELETE) {
+            return false;
+        }
         String groupId = form.getInlongGroupId();
         InlongGroupInfo groupInfo = form.getGroupInfo();
         MQType mqType = MQType.forType(groupInfo.getMqType());
@@ -48,16 +52,15 @@ public class PulsarEventSelector implements EventSelector {
             InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
             boolean enable = 
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
             if (enable) {
-                log.info("need to create pulsar resource as the createResource 
was true for groupId [{}]", groupId);
+                log.info("need to delete pulsar resource as the createResource 
was true for groupId [{}]", groupId);
                 return true;
             } else {
-                log.info("skip to create pulsar resource as the createResource 
was false for groupId [{}]", groupId);
+                log.info("skip to delete pulsar resource as the createResource 
was false for groupId [{}]", groupId);
                 return false;
             }
         }
 
-        log.warn("skip to create pulsar subscription as the mq type is {} for 
groupId [{}]", mqType, groupId);
+        log.warn("skip to delete pulsar subscription as the mq type is {} for 
groupId [{}]", mqType, groupId);
         return false;
     }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
similarity index 57%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
index 82a7b8fa6..aefb64f66 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
@@ -18,15 +18,17 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
-public class PulsarTopicSelector implements EventSelector {
+public class PulsarTopicCreateSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
@@ -35,14 +37,27 @@ public class PulsarTopicSelector implements EventSelector {
             return false;
         }
         StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
+        GroupOperateType operateType = 
streamResourceProcessForm.getGroupOperateType();
+        if (operateType != GroupOperateType.INIT) {
+            return false;
+        }
         MQType mqType = 
MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
+        String groupId = 
streamResourceProcessForm.getGroupInfo().getInlongGroupId();
+        String streamId = 
streamResourceProcessForm.getStreamInfo().getInlongStreamId();
         if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            return true;
-        } else {
-            InlongStreamInfo streamInfo = 
streamResourceProcessForm.getStreamInfo();
-            log.warn("no need to create pulsar topic for groupId={}, 
streamId={}, as the middlewareType={}",
-                    streamInfo.getInlongGroupId(), 
streamInfo.getInlongStreamId(), mqType);
-            return false;
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) 
streamResourceProcessForm.getGroupInfo();
+            boolean enable = 
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+            if (enable) {
+                log.info("need to create pulsar topic as the createResource 
was true for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return true;
+            } else {
+                log.info("skip to create pulsar topic as the createResource 
was false for groupId [{}] streamId [{}]",
+                        groupId, streamId);
+                return false;
+            }
         }
+        return false;
+
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
similarity index 58%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
index d9f33b2a9..a10b64df5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
@@ -19,45 +19,44 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import 
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import 
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
-/**
- * Selector of pulsar event for create pulsar resource.
- */
 @Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarTopicDeleteSelector implements EventSelector {
 
     @Override
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
+        if (!(processForm instanceof StreamResourceProcessForm)) {
             return false;
         }
-
-        GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        String groupId = form.getInlongGroupId();
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        MQType mqType = MQType.forType(groupInfo.getMqType());
+        StreamResourceProcessForm streamResourceProcessForm = 
(StreamResourceProcessForm) processForm;
+        GroupOperateType operateType = 
streamResourceProcessForm.getGroupOperateType();
+        if (operateType != GroupOperateType.DELETE) {
+            return false;
+        }
+        MQType mqType = 
MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
+        String groupId = 
streamResourceProcessForm.getGroupInfo().getInlongGroupId();
+        String streamId = 
streamResourceProcessForm.getStreamInfo().getInlongStreamId();
         if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) 
streamResourceProcessForm.getGroupInfo();
             boolean enable = 
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
             if (enable) {
-                log.info("need to create pulsar resource as the createResource 
was true for groupId [{}]", groupId);
+                log.info("need to delete pulsar topic as the createResource 
was true for groupId [{}] streamId [{}]",
+                        groupId, streamId);
                 return true;
             } else {
-                log.info("skip to create pulsar resource as the createResource 
was false for groupId [{}]", groupId);
+                log.info("skip to delete pulsar topic as the createResource 
was false for groupId [{}] streamId [{}]",
+                        groupId, streamId);
                 return false;
             }
         }
-
-        log.warn("skip to create pulsar subscription as the mq type is {} for 
groupId [{}]", mqType, groupId);
         return false;
     }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index d8c66e5b9..00fcb5589 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -91,7 +91,6 @@ public class PulsarOperator {
             List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
             Namespaces namespaces = pulsarAdmin.namespaces();
             namespaces.createNamespace(namespaceName, 
Sets.newHashSet(clusters));
-
             // Configure message TTL
             Integer ttl = pulsarInfo.getTtl();
             if (ttl > 0) {
@@ -126,6 +125,30 @@ public class PulsarOperator {
         }
     }
 
+    public void forceDeleteNamespace(PulsarAdmin pulsarAdmin, String tenant, 
String namespace)
+            throws PulsarAdminException {
+        Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty 
during create namespace");
+        Preconditions.checkNotNull(namespace, "pulsar namespace cannot be 
empty during create namespace");
+
+        String namespaceName = tenant + "/" + namespace;
+        LOGGER.info("begin to delete namespace={}", namespaceName);
+
+        try {
+            // Check whether the namespace exists, and create it if it does 
not exist
+            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, 
namespace);
+            if (!isExists) {
+                LOGGER.warn("namespace={} already delete", namespaceName);
+                return;
+            }
+            Namespaces namespaces = pulsarAdmin.namespaces();
+            namespaces.deleteNamespace(namespaceName, true);
+            LOGGER.info("success to delete namespace={}", namespaceName);
+        } catch (PulsarAdminException e) {
+            LOGGER.error("failed to delete namespace=" + namespaceName, e);
+            throw e;
+        }
+    }
+
     public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean 
topicBean) throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be 
empty");
 
@@ -163,6 +186,29 @@ public class PulsarOperator {
         }
     }
 
+    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean 
topicBean) throws PulsarAdminException {
+        Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be 
empty");
+
+        String tenant = topicBean.getTenant();
+        String namespace = topicBean.getNamespace();
+        String topic = topicBean.getTopicName();
+        String topicFullName = tenant + "/" + namespace + "/" + topic;
+
+        // Topic will be returned if it not exists
+        if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+            LOGGER.warn("pulsar topic={} already delete", topicFullName);
+            return;
+        }
+
+        try {
+            pulsarAdmin.topics().delete(topicFullName, true);
+            LOGGER.info("success to delete topic={}", topicFullName);
+        } catch (PulsarAdminException e) {
+            LOGGER.error("failed to delete topic=" + topicFullName, e);
+            throw e;
+        }
+    }
+
     public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean 
topicBean, String subscription)
             throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "can not find tenant information 
to create subscription");
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 85bb575bb..9e2e4ed26 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -75,6 +75,14 @@ public class DeleteGroupWorkflowDefinition implements 
WorkflowDefinition {
         deleteDataSourceTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(deleteDataSourceTask);
 
+        //delete MQ
+        ServiceTask deleteMqTask = new ServiceTask();
+        deleteMqTask.setName("deleteMQ");
+        deleteMqTask.setDisplayName("Group-DeleteMQ");
+        deleteMqTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
+        deleteMqTask.addListenerProvider(groupTaskListenerFactory);
+        process.addTask(deleteMqTask);
+
         //delete sort
         ServiceTask deleteSortTask = new ServiceTask();
         deleteSortTask.setName("deleteSort");
@@ -88,7 +96,8 @@ public class DeleteGroupWorkflowDefinition implements 
WorkflowDefinition {
         process.setEndEvent(endEvent);
 
         startEvent.addNext(deleteDataSourceTask);
-        deleteDataSourceTask.addNext(deleteSortTask);
+        deleteDataSourceTask.addNext(deleteMqTask);
+        deleteMqTask.addNext(deleteSortTask);
         deleteSortTask.addNext(endEvent);
 
         return process;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index 260962624..8ec31e3ef 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -24,7 +24,9 @@ import 
org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
 import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarEventSelector;
+import org.apache.inlong.manager.service.mq.DeletePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.mq.PulsarResourceCreateSelector;
+import org.apache.inlong.manager.service.mq.PulsarResourceDeleteSelector;
 import org.apache.inlong.manager.service.mq.TubeEventSelector;
 import org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.CreateSortConfigListenerV2;
@@ -86,6 +88,8 @@ public class GroupTaskListenerFactory implements 
PluginBinder, ServiceTaskListen
     private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
     @Autowired
     private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+    @Autowired
+    private DeletePulsarResourceTaskListener deletePulsarResourceTaskListener;
 
     @Autowired
     private SinkResourceListener sinkResourceListener;
@@ -105,8 +109,9 @@ public class GroupTaskListenerFactory implements 
PluginBinder, ServiceTaskListen
         queueOperateListeners = new LinkedHashMap<>();
         queueOperateListeners.put(createTubeTopicTaskListener, new 
TubeEventSelector());
         queueOperateListeners.put(createTubeGroupTaskListener, new 
TubeEventSelector());
-        queueOperateListeners.put(createPulsarResourceTaskListener, new 
PulsarEventSelector());
-        queueOperateListeners.put(createPulsarGroupTaskListener, new 
PulsarEventSelector());
+        queueOperateListeners.put(createPulsarResourceTaskListener, new 
PulsarResourceCreateSelector());
+        queueOperateListeners.put(createPulsarGroupTaskListener, new 
PulsarResourceCreateSelector());
+        queueOperateListeners.put(deletePulsarResourceTaskListener, new 
PulsarResourceDeleteSelector());
         sortOperateListeners = new LinkedHashMap<>();
         sortOperateListeners.put(createSortConfigListener, new 
ZookeeperDisabledSelector());
         sortOperateListeners.put(lightGroupSortListener, new 
LightGroupSortSelector());
@@ -122,6 +127,7 @@ public class GroupTaskListenerFactory implements 
PluginBinder, ServiceTaskListen
     public List<TaskEventListener> get(WorkflowContext workflowContext, 
ServiceTaskType serviceTaskType) {
         switch (serviceTaskType) {
             case INIT_MQ:
+            case DELETE_MQ:
                 List<QueueOperateListener> queueOperateListeners = 
getQueueOperateListener(workflowContext);
                 return Lists.newArrayList(queueOperateListeners);
             case INIT_SORT:
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
index ec86d0c15..c0e0db916 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
@@ -24,7 +24,9 @@ import 
org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
 import 
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import 
org.apache.inlong.manager.service.mq.CreatePulsarSubscriptionTaskListener;
 import org.apache.inlong.manager.service.mq.CreatePulsarTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarTopicSelector;
+import org.apache.inlong.manager.service.mq.DeletePulsarTopicTaskListener;
+import org.apache.inlong.manager.service.mq.PulsarTopicCreateSelector;
+import org.apache.inlong.manager.service.mq.PulsarTopicDeleteSelector;
 import org.apache.inlong.manager.service.resource.StreamSinkResourceListener;
 import org.apache.inlong.manager.service.sort.CreateStreamSortConfigListener;
 import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
@@ -65,6 +67,8 @@ public class StreamTaskListenerFactory implements 
PluginBinder, ServiceTaskListe
     @Autowired
     private CreatePulsarSubscriptionTaskListener 
createPulsarSubscriptionTaskListener;
     @Autowired
+    private DeletePulsarTopicTaskListener deletePulsarTopicTaskListener;
+    @Autowired
     private CreateStreamSortConfigListener createSortConfigListener;
     @Autowired
     private StreamSinkResourceListener sinkResourceListener;
@@ -73,8 +77,9 @@ public class StreamTaskListenerFactory implements 
PluginBinder, ServiceTaskListe
     public void init() {
         sourceOperateListeners = new LinkedHashMap<>();
         queueOperateListeners = new LinkedHashMap<>();
-        queueOperateListeners.put(createPulsarTopicTaskListener, new 
PulsarTopicSelector());
-        queueOperateListeners.put(createPulsarSubscriptionTaskListener, new 
PulsarTopicSelector());
+        queueOperateListeners.put(createPulsarTopicTaskListener, new 
PulsarTopicCreateSelector());
+        queueOperateListeners.put(createPulsarSubscriptionTaskListener, new 
PulsarTopicCreateSelector());
+        queueOperateListeners.put(deletePulsarTopicTaskListener, new 
PulsarTopicDeleteSelector());
         sortOperateListeners = new LinkedHashMap<>();
         sortOperateListeners.put(createSortConfigListener, new 
ZookeeperEnabledSelector());
         sinkOperateListeners = new LinkedHashMap<>();
@@ -88,6 +93,7 @@ public class StreamTaskListenerFactory implements 
PluginBinder, ServiceTaskListe
     public Iterable get(WorkflowContext workflowContext, ServiceTaskType 
serviceTaskType) {
         switch (serviceTaskType) {
             case INIT_MQ:
+            case DELETE_MQ:
                 List<QueueOperateListener> queueOperateListeners = 
getQueueOperateListener(workflowContext);
                 return Lists.newArrayList(queueOperateListeners);
             case INIT_SORT:
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 25870a6e9..bedb1e06b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -67,7 +67,7 @@ public class DeleteStreamWorkflowDefinition implements 
WorkflowDefinition {
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //delete datasource
+        // Delete datasource
         ServiceTask deleteDataSourceTask = new ServiceTask();
         deleteDataSourceTask.setName("deleteSource");
         deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
@@ -75,7 +75,15 @@ public class DeleteStreamWorkflowDefinition implements 
WorkflowDefinition {
         deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(deleteDataSourceTask);
 
-        //delete sort
+        // Delete MQ
+        ServiceTask deleteMQTask = new ServiceTask();
+        deleteMQTask.setName("deleteMQ");
+        deleteMQTask.setDisplayName("Stream-DeleteMQ");
+        deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
+        deleteMQTask.addListenerProvider(streamTaskListenerFactory);
+        process.addTask(deleteMQTask);
+
+        // Delete sort
         ServiceTask deleteSortTask = new ServiceTask();
         deleteSortTask.setName("deleteSort");
         deleteSortTask.setDisplayName("Stream-DeleteSort");
@@ -88,7 +96,8 @@ public class DeleteStreamWorkflowDefinition implements 
WorkflowDefinition {
         process.setEndEvent(endEvent);
 
         startEvent.addNext(deleteDataSourceTask);
-        deleteDataSourceTask.addNext(deleteSortTask);
+        deleteDataSourceTask.addNext(deleteMQTask);
+        deleteMQTask.addNext(deleteSortTask);
         deleteSortTask.addNext(endEvent);
 
         return process;
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
index 80f157a58..2686b4b60 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
@@ -33,6 +33,7 @@ public enum ServiceTaskType {
     RESTART_SORT,
 
     DELETE_SOURCE,
+    DELETE_MQ,
     DELETE_SORT;
 
     @Override

Reply via email to