This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6802fc969 [INLONG-4811][Manager] Add delete MQ task in group and
stream workflow definition (#4813)
6802fc969 is described below
commit 6802fc969fe5cd3bfa66ded27aec85c775d2ef08
Author: kipshi <[email protected]>
AuthorDate: Thu Jun 30 16:59:19 2022 +0800
[INLONG-4811][Manager] Add delete MQ task in group and stream workflow
definition (#4813)
---
.../service/mq/CreateTubeTopicTaskListener.java | 3 -
.../mq/DeletePulsarResourceTaskListener.java | 121 +++++++++++++++++++++
.../service/mq/DeletePulsarTopicTaskListener.java | 102 +++++++++++++++++
...ctor.java => PulsarResourceCreateSelector.java} | 10 +-
...ctor.java => PulsarResourceDeleteSelector.java} | 17 +--
...elector.java => PulsarTopicCreateSelector.java} | 31 ++++--
...elector.java => PulsarTopicDeleteSelector.java} | 35 +++---
.../manager/service/mq/util/PulsarOperator.java | 48 +++++++-
.../group/DeleteGroupWorkflowDefinition.java | 11 +-
.../listener/GroupTaskListenerFactory.java | 12 +-
.../listener/StreamTaskListenerFactory.java | 12 +-
.../stream/DeleteStreamWorkflowDefinition.java | 15 ++-
.../workflow/definition/ServiceTaskType.java | 1 +
13 files changed, 368 insertions(+), 50 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index 50ace8a4a..554a8d298 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -25,7 +25,6 @@ import
org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -45,8 +44,6 @@ public class CreateTubeTopicTaskListener implements
QueueOperateListener {
private InlongClusterService clusterService;
@Autowired
private TubeMQOperator tubeMQOperator;
- @Autowired
- private InlongGroupService groupService;
@Override
public TaskEvent event() {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
new file mode 100644
index 000000000..3699cb1dd
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarResourceTaskListener.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
+import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
+import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * Delete Pulsar tenant, namespace and topic after group is operated to delete
+ */
+@Slf4j
+@Component
+public class DeletePulsarResourceTaskListener implements QueueOperateListener {
+
+ @Autowired
+ private InlongGroupService groupService;
+ @Autowired
+ private InlongStreamService streamService;
+ @Autowired
+ private InlongClusterService clusterService;
+ @Autowired
+ private PulsarOperator pulsarOperator;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ GroupResourceProcessForm form = (GroupResourceProcessForm)
context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ log.info("begin to delete pulsar resource for groupId={}", groupId);
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ if (groupInfo == null) {
+ throw new WorkflowListenerException("inlong group or pulsar
cluster not found for groupId=" + groupId);
+ }
+ try {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ InlongClusterInfo clusterInfo =
clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+ ClusterType.CLS_PULSAR);
+ deletePulsarProcess(pulsarInfo, (PulsarClusterInfo) clusterInfo);
+ } catch (Exception e) {
+ log.error("delete pulsar resource error for groupId={}", groupId,
e);
+ throw new WorkflowListenerException("delete pulsar resource error
for groupId=" + groupId);
+ }
+
+ log.info("success to delete pulsar resource for groupId={}", groupId);
+ return ListenerResult.success();
+ }
+
+ /**
+ * Delete all Pulsar topics forcefully
+ */
+ private void deletePulsarProcess(InlongPulsarInfo pulsarInfo,
PulsarClusterInfo pulsarCluster) throws Exception {
+ String groupId = pulsarInfo.getInlongGroupId();
+ log.info("begin to delete pulsar resource for groupId={} in
cluster={}", groupId, pulsarCluster);
+ final String tenant = pulsarCluster.getTenant();
+ final String namespace = pulsarInfo.getMqResource();
+ Preconditions.checkNotNull(namespace, "pulsar namespace cannot be
empty for groupId=" + groupId);
+ try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ List<InlongStreamBriefInfo> streamTopicList =
streamService.getTopicList(groupId);
+ for (InlongStreamBriefInfo streamInfo : streamTopicList) {
+ final String topic = streamInfo.getMqResource();
+ if (topic == null) {
+ continue;
+ }
+ PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ .tenant(tenant)
+ .namespace(namespace)
+ .topicName(topic)
+ .build();
+ pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+ }
+ }
+ log.info("finish to delete pulsar resource for groupId={},
cluster={}", groupId, pulsarCluster);
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
new file mode 100644
index 000000000..b7c279b19
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/DeletePulsarTopicTaskListener.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
+import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.event.ListenerResult;
+import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Delete pulsar topic after stream is operated to delete.
+ */
+@Slf4j
+@Component
+public class DeletePulsarTopicTaskListener implements QueueOperateListener {
+
+ @Autowired
+ private InlongClusterService clusterService;
+ @Autowired
+ private PulsarOperator pulsarOperator;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws Exception {
+ StreamResourceProcessForm form = (StreamResourceProcessForm)
context.getProcessForm();
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ InlongStreamInfo streamInfo = form.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ log.info("begin to delete pulsar topic for groupId={}, streamId={}",
groupId, streamId);
+
+ try {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ String pulsarTopic = streamInfo.getMqResource();
+ String clusterTag = pulsarInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag,
null, ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ }
+
+ try (PulsarAdmin pulsarAdmin =
PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ .tenant(tenant)
+ .namespace(pulsarInfo.getMqResource())
+ .topicName(pulsarTopic)
+ .build();
+ pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+ }
+ } catch (Exception e) {
+ String msg = String.format("failed to delete pulsar topic for
groupId=%s, streamId=%s", groupId, streamId);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg);
+ }
+
+ log.info("success to delete pulsar topic for groupId={}, streamId={}",
groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
similarity index 88%
copy from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
copy to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
index d9f33b2a9..df5f035ae 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceCreateSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of pulsar event for create pulsar resource.
+ * Selector of pulsar event for creating pulsar resource.
*/
@Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarResourceCreateSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
@@ -39,8 +40,11 @@ public class PulsarEventSelector implements EventSelector {
if (!(processForm instanceof GroupResourceProcessForm)) {
return false;
}
-
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ GroupOperateType operateType = form.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
String groupId = form.getInlongGroupId();
InlongGroupInfo groupInfo = form.getGroupInfo();
MQType mqType = MQType.forType(groupInfo.getMqType());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
similarity index 82%
copy from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
copy to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
index d9f33b2a9..9d9e83625 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarResourceDeleteSelector.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
/**
- * Selector of pulsar event for create pulsar resource.
+ * Selector of pulsar event for deleting pulsar resource
*/
@Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarResourceDeleteSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
@@ -39,8 +40,11 @@ public class PulsarEventSelector implements EventSelector {
if (!(processForm instanceof GroupResourceProcessForm)) {
return false;
}
-
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
+ GroupOperateType operateType = form.getGroupOperateType();
+ if (operateType != GroupOperateType.DELETE) {
+ return false;
+ }
String groupId = form.getInlongGroupId();
InlongGroupInfo groupInfo = form.getGroupInfo();
MQType mqType = MQType.forType(groupInfo.getMqType());
@@ -48,16 +52,15 @@ public class PulsarEventSelector implements EventSelector {
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
boolean enable =
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
if (enable) {
- log.info("need to create pulsar resource as the createResource
was true for groupId [{}]", groupId);
+ log.info("need to delete pulsar resource as the createResource
was true for groupId [{}]", groupId);
return true;
} else {
- log.info("skip to create pulsar resource as the createResource
was false for groupId [{}]", groupId);
+ log.info("skip to delete pulsar resource as the createResource
was false for groupId [{}]", groupId);
return false;
}
}
- log.warn("skip to create pulsar subscription as the mq type is {} for
groupId [{}]", mqType, groupId);
+ log.warn("skip to delete pulsar subscription as the mq type is {} for
groupId [{}]", mqType, groupId);
return false;
}
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
similarity index 57%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
index 82a7b8fa6..aefb64f66 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicSelector.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicCreateSelector.java
@@ -18,15 +18,17 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
-public class PulsarTopicSelector implements EventSelector {
+public class PulsarTopicCreateSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
@@ -35,14 +37,27 @@ public class PulsarTopicSelector implements EventSelector {
return false;
}
StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
+ GroupOperateType operateType =
streamResourceProcessForm.getGroupOperateType();
+ if (operateType != GroupOperateType.INIT) {
+ return false;
+ }
MQType mqType =
MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
+ String groupId =
streamResourceProcessForm.getGroupInfo().getInlongGroupId();
+ String streamId =
streamResourceProcessForm.getStreamInfo().getInlongStreamId();
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- return true;
- } else {
- InlongStreamInfo streamInfo =
streamResourceProcessForm.getStreamInfo();
- log.warn("no need to create pulsar topic for groupId={},
streamId={}, as the middlewareType={}",
- streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId(), mqType);
- return false;
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)
streamResourceProcessForm.getGroupInfo();
+ boolean enable =
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
+ if (enable) {
+ log.info("need to create pulsar topic as the createResource
was true for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return true;
+ } else {
+ log.info("skip to create pulsar topic as the createResource
was false for groupId [{}] streamId [{}]",
+ groupId, streamId);
+ return false;
+ }
}
+ return false;
+
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
similarity index 58%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
index d9f33b2a9..a10b64df5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarEventSelector.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/PulsarTopicDeleteSelector.java
@@ -19,45 +19,44 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import
org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
-/**
- * Selector of pulsar event for create pulsar resource.
- */
@Slf4j
-public class PulsarEventSelector implements EventSelector {
+public class PulsarTopicDeleteSelector implements EventSelector {
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof GroupResourceProcessForm)) {
+ if (!(processForm instanceof StreamResourceProcessForm)) {
return false;
}
-
- GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- String groupId = form.getInlongGroupId();
- InlongGroupInfo groupInfo = form.getGroupInfo();
- MQType mqType = MQType.forType(groupInfo.getMqType());
+ StreamResourceProcessForm streamResourceProcessForm =
(StreamResourceProcessForm) processForm;
+ GroupOperateType operateType =
streamResourceProcessForm.getGroupOperateType();
+ if (operateType != GroupOperateType.DELETE) {
+ return false;
+ }
+ MQType mqType =
MQType.forType(streamResourceProcessForm.getGroupInfo().getMqType());
+ String groupId =
streamResourceProcessForm.getGroupInfo().getInlongGroupId();
+ String streamId =
streamResourceProcessForm.getStreamInfo().getInlongStreamId();
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo)
streamResourceProcessForm.getGroupInfo();
boolean enable =
InlongConstants.ENABLE_CREATE_RESOURCE.equals(pulsarInfo.getEnableCreateResource());
if (enable) {
- log.info("need to create pulsar resource as the createResource
was true for groupId [{}]", groupId);
+ log.info("need to delete pulsar topic as the createResource
was true for groupId [{}] streamId [{}]",
+ groupId, streamId);
return true;
} else {
- log.info("skip to create pulsar resource as the createResource
was false for groupId [{}]", groupId);
+ log.info("skip to delete pulsar topic as the createResource
was false for groupId [{}] streamId [{}]",
+ groupId, streamId);
return false;
}
}
-
- log.warn("skip to create pulsar subscription as the mq type is {} for
groupId [{}]", mqType, groupId);
return false;
}
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index d8c66e5b9..00fcb5589 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -91,7 +91,6 @@ public class PulsarOperator {
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
Namespaces namespaces = pulsarAdmin.namespaces();
namespaces.createNamespace(namespaceName,
Sets.newHashSet(clusters));
-
// Configure message TTL
Integer ttl = pulsarInfo.getTtl();
if (ttl > 0) {
@@ -126,6 +125,30 @@ public class PulsarOperator {
}
}
+ public void forceDeleteNamespace(PulsarAdmin pulsarAdmin, String tenant,
String namespace)
+ throws PulsarAdminException {
+ Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty
during create namespace");
+ Preconditions.checkNotNull(namespace, "pulsar namespace cannot be
empty during create namespace");
+
+ String namespaceName = tenant + "/" + namespace;
+ LOGGER.info("begin to delete namespace={}", namespaceName);
+
+ try {
+ // Check whether the namespace exists, and create it if it does
not exist
+ boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant,
namespace);
+ if (!isExists) {
+ LOGGER.warn("namespace={} already delete", namespaceName);
+ return;
+ }
+ Namespaces namespaces = pulsarAdmin.namespaces();
+ namespaces.deleteNamespace(namespaceName, true);
+ LOGGER.info("success to delete namespace={}", namespaceName);
+ } catch (PulsarAdminException e) {
+ LOGGER.error("failed to delete namespace=" + namespaceName, e);
+ throw e;
+ }
+ }
+
public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean
topicBean) throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be
empty");
@@ -163,6 +186,29 @@ public class PulsarOperator {
}
}
+ public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean
topicBean) throws PulsarAdminException {
+ Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be
empty");
+
+ String tenant = topicBean.getTenant();
+ String namespace = topicBean.getNamespace();
+ String topic = topicBean.getTopicName();
+ String topicFullName = tenant + "/" + namespace + "/" + topic;
+
+ // Topic will be returned if it not exists
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ LOGGER.warn("pulsar topic={} already delete", topicFullName);
+ return;
+ }
+
+ try {
+ pulsarAdmin.topics().delete(topicFullName, true);
+ LOGGER.info("success to delete topic={}", topicFullName);
+ } catch (PulsarAdminException e) {
+ LOGGER.error("failed to delete topic=" + topicFullName, e);
+ throw e;
+ }
+ }
+
public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean
topicBean, String subscription)
throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "can not find tenant information
to create subscription");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 85bb575bb..9e2e4ed26 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -75,6 +75,14 @@ public class DeleteGroupWorkflowDefinition implements
WorkflowDefinition {
deleteDataSourceTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(deleteDataSourceTask);
+ //delete MQ
+ ServiceTask deleteMqTask = new ServiceTask();
+ deleteMqTask.setName("deleteMQ");
+ deleteMqTask.setDisplayName("Group-DeleteMQ");
+ deleteMqTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
+ deleteMqTask.addListenerProvider(groupTaskListenerFactory);
+ process.addTask(deleteMqTask);
+
//delete sort
ServiceTask deleteSortTask = new ServiceTask();
deleteSortTask.setName("deleteSort");
@@ -88,7 +96,8 @@ public class DeleteGroupWorkflowDefinition implements
WorkflowDefinition {
process.setEndEvent(endEvent);
startEvent.addNext(deleteDataSourceTask);
- deleteDataSourceTask.addNext(deleteSortTask);
+ deleteDataSourceTask.addNext(deleteMqTask);
+ deleteMqTask.addNext(deleteSortTask);
deleteSortTask.addNext(endEvent);
return process;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index 260962624..8ec31e3ef 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -24,7 +24,9 @@ import
org.apache.inlong.manager.service.mq.CreatePulsarGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarResourceTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeGroupTaskListener;
import org.apache.inlong.manager.service.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarEventSelector;
+import org.apache.inlong.manager.service.mq.DeletePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.mq.PulsarResourceCreateSelector;
+import org.apache.inlong.manager.service.mq.PulsarResourceDeleteSelector;
import org.apache.inlong.manager.service.mq.TubeEventSelector;
import org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.CreateSortConfigListenerV2;
@@ -86,6 +88,8 @@ public class GroupTaskListenerFactory implements
PluginBinder, ServiceTaskListen
private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
@Autowired
private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+ @Autowired
+ private DeletePulsarResourceTaskListener deletePulsarResourceTaskListener;
@Autowired
private SinkResourceListener sinkResourceListener;
@@ -105,8 +109,9 @@ public class GroupTaskListenerFactory implements
PluginBinder, ServiceTaskListen
queueOperateListeners = new LinkedHashMap<>();
queueOperateListeners.put(createTubeTopicTaskListener, new
TubeEventSelector());
queueOperateListeners.put(createTubeGroupTaskListener, new
TubeEventSelector());
- queueOperateListeners.put(createPulsarResourceTaskListener, new
PulsarEventSelector());
- queueOperateListeners.put(createPulsarGroupTaskListener, new
PulsarEventSelector());
+ queueOperateListeners.put(createPulsarResourceTaskListener, new
PulsarResourceCreateSelector());
+ queueOperateListeners.put(createPulsarGroupTaskListener, new
PulsarResourceCreateSelector());
+ queueOperateListeners.put(deletePulsarResourceTaskListener, new
PulsarResourceDeleteSelector());
sortOperateListeners = new LinkedHashMap<>();
sortOperateListeners.put(createSortConfigListener, new
ZookeeperDisabledSelector());
sortOperateListeners.put(lightGroupSortListener, new
LightGroupSortSelector());
@@ -122,6 +127,7 @@ public class GroupTaskListenerFactory implements
PluginBinder, ServiceTaskListen
public List<TaskEventListener> get(WorkflowContext workflowContext,
ServiceTaskType serviceTaskType) {
switch (serviceTaskType) {
case INIT_MQ:
+ case DELETE_MQ:
List<QueueOperateListener> queueOperateListeners =
getQueueOperateListener(workflowContext);
return Lists.newArrayList(queueOperateListeners);
case INIT_SORT:
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
index ec86d0c15..c0e0db916 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/StreamTaskListenerFactory.java
@@ -24,7 +24,9 @@ import
org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import
org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import
org.apache.inlong.manager.service.mq.CreatePulsarSubscriptionTaskListener;
import org.apache.inlong.manager.service.mq.CreatePulsarTopicTaskListener;
-import org.apache.inlong.manager.service.mq.PulsarTopicSelector;
+import org.apache.inlong.manager.service.mq.DeletePulsarTopicTaskListener;
+import org.apache.inlong.manager.service.mq.PulsarTopicCreateSelector;
+import org.apache.inlong.manager.service.mq.PulsarTopicDeleteSelector;
import org.apache.inlong.manager.service.resource.StreamSinkResourceListener;
import org.apache.inlong.manager.service.sort.CreateStreamSortConfigListener;
import org.apache.inlong.manager.service.sort.ZookeeperEnabledSelector;
@@ -65,6 +67,8 @@ public class StreamTaskListenerFactory implements
PluginBinder, ServiceTaskListe
@Autowired
private CreatePulsarSubscriptionTaskListener
createPulsarSubscriptionTaskListener;
@Autowired
+ private DeletePulsarTopicTaskListener deletePulsarTopicTaskListener;
+ @Autowired
private CreateStreamSortConfigListener createSortConfigListener;
@Autowired
private StreamSinkResourceListener sinkResourceListener;
@@ -73,8 +77,9 @@ public class StreamTaskListenerFactory implements
PluginBinder, ServiceTaskListe
public void init() {
sourceOperateListeners = new LinkedHashMap<>();
queueOperateListeners = new LinkedHashMap<>();
- queueOperateListeners.put(createPulsarTopicTaskListener, new
PulsarTopicSelector());
- queueOperateListeners.put(createPulsarSubscriptionTaskListener, new
PulsarTopicSelector());
+ queueOperateListeners.put(createPulsarTopicTaskListener, new
PulsarTopicCreateSelector());
+ queueOperateListeners.put(createPulsarSubscriptionTaskListener, new
PulsarTopicCreateSelector());
+ queueOperateListeners.put(deletePulsarTopicTaskListener, new
PulsarTopicDeleteSelector());
sortOperateListeners = new LinkedHashMap<>();
sortOperateListeners.put(createSortConfigListener, new
ZookeeperEnabledSelector());
sinkOperateListeners = new LinkedHashMap<>();
@@ -88,6 +93,7 @@ public class StreamTaskListenerFactory implements
PluginBinder, ServiceTaskListe
public Iterable get(WorkflowContext workflowContext, ServiceTaskType
serviceTaskType) {
switch (serviceTaskType) {
case INIT_MQ:
+ case DELETE_MQ:
List<QueueOperateListener> queueOperateListeners =
getQueueOperateListener(workflowContext);
return Lists.newArrayList(queueOperateListeners);
case INIT_SORT:
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 25870a6e9..bedb1e06b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -67,7 +67,7 @@ public class DeleteStreamWorkflowDefinition implements
WorkflowDefinition {
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //delete datasource
+ // Delete datasource
ServiceTask deleteDataSourceTask = new ServiceTask();
deleteDataSourceTask.setName("deleteSource");
deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
@@ -75,7 +75,15 @@ public class DeleteStreamWorkflowDefinition implements
WorkflowDefinition {
deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(deleteDataSourceTask);
- //delete sort
+ // Delete MQ
+ ServiceTask deleteMQTask = new ServiceTask();
+ deleteMQTask.setName("deleteMQ");
+ deleteMQTask.setDisplayName("Stream-DeleteMQ");
+ deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
+ deleteMQTask.addListenerProvider(streamTaskListenerFactory);
+ process.addTask(deleteMQTask);
+
+ // Delete sort
ServiceTask deleteSortTask = new ServiceTask();
deleteSortTask.setName("deleteSort");
deleteSortTask.setDisplayName("Stream-DeleteSort");
@@ -88,7 +96,8 @@ public class DeleteStreamWorkflowDefinition implements
WorkflowDefinition {
process.setEndEvent(endEvent);
startEvent.addNext(deleteDataSourceTask);
- deleteDataSourceTask.addNext(deleteSortTask);
+ deleteDataSourceTask.addNext(deleteMQTask);
+ deleteMQTask.addNext(deleteSortTask);
deleteSortTask.addNext(endEvent);
return process;
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
index 80f157a58..2686b4b60 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/definition/ServiceTaskType.java
@@ -33,6 +33,7 @@ public enum ServiceTaskType {
RESTART_SORT,
DELETE_SOURCE,
+ DELETE_MQ,
DELETE_SORT;
@Override