This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d8cdae17d [INLONG-6216][Manager] Support getting group topic info 
(#6230)
d8cdae17d is described below

commit d8cdae17d8ce1f31c2996eb34210f928c9de3465
Author: vernedeng <[email protected]>
AuthorDate: Tue Oct 25 09:40:25 2022 +0800

    [INLONG-6216][Manager] Support getting group topic info (#6230)
    
    * Support getting group topic info
    
    * Refactor the getTopic API, add getBackupTopic API
    
    Co-authored-by: healchow <[email protected]>
---
 .../inlong/common/constant/ClusterSwitch.java      | 34 +++++++++++
 .../client/api/inner/ClientFactoryTest.java        | 33 ++++------
 .../dao/mapper/InlongClusterEntityMapper.java      |  2 +
 .../dao/mapper/InlongGroupExtEntityMapper.java     |  2 +-
 .../dao/mapper/InlongStreamExtEntityMapper.java    |  3 +
 .../mappers/InlongClusterEntityMapper.xml          |  1 -
 .../mappers/InlongGroupExtEntityMapper.xml         |  3 +-
 .../mappers/InlongStreamExtEntityMapper.xml        |  9 +++
 .../manager/pojo/group/InlongGroupTopicInfo.java   | 29 +++------
 .../pojo/group/kafka/InlongKafkaTopicInfo.java     | 47 ++++++++++++++
 .../pojo/group/none/InlongNoneMqTopicInfo.java     | 44 ++++++++++++++
 .../pojo/group/pulsar/InlongPulsarTopicInfo.java   | 53 ++++++++++++++++
 .../pojo/group/tubemq/InlongTubeMQTopicInfo.java   | 45 ++++++++++++++
 .../service/cluster/InlongClusterService.java      |  9 +++
 .../service/cluster/InlongClusterServiceImpl.java  | 19 ++++++
 .../service/cluster/TubeClusterOperator.java       |  4 +-
 .../service/consume/ConsumeTubeMQOperator.java     |  7 ++-
 .../service/core/impl/ConsumptionServiceImpl.java  |  6 +-
 .../service/group/AbstractGroupOperator.java       | 29 +++++----
 .../manager/service/group/InlongGroupOperator.java | 12 +++-
 ...perator.java => InlongGroupOperator4Kafka.java} | 47 +++++++++++---
 ...erator.java => InlongGroupOperator4NoneMQ.java} |  4 +-
 ...erator.java => InlongGroupOperator4Pulsar.java} | 71 +++++++++++++++-------
 ...erator.java => InlongGroupOperator4TubeMQ.java} | 33 ++++++++--
 .../manager/service/group/InlongGroupService.java  | 13 +++-
 .../service/group/InlongGroupServiceImpl.java      | 58 +++++++++++++++---
 .../service/stream/InlongStreamServiceImpl.java    |  2 +-
 .../web/controller/InlongGroupController.java      | 19 ++++--
 28 files changed, 518 insertions(+), 120 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
new file mode 100644
index 000000000..90816730c
--- /dev/null
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.constant;
+
+/**
+ * Constants of cluster switching
+ */
+public class ClusterSwitch {
+
+    /**
+     * Cluster tag for backup.
+     */
+    public static final String BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+
+    /**
+     * MQ resource for backup, represents the namespace of Pulsar, the topic 
of TubeMQ, etc.
+     */
+    public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource";
+}
diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 91f28c836..215218ef5 100644
--- 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -39,7 +39,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.auth.TokenAuthentication;
 import org.apache.inlong.manager.common.consts.DataNodeType;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -61,9 +60,9 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
 import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
@@ -82,7 +81,6 @@ import 
org.apache.inlong.manager.pojo.source.autopush.AutoPushSource;
 import org.apache.inlong.manager.pojo.source.file.FileSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamResponse;
 import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -94,6 +92,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -477,19 +476,14 @@ class ClientFactoryTest {
 
     @Test
     void getTopic() {
-        InlongGroupTopicInfo expected = new InlongGroupTopicInfo();
+        InlongPulsarTopicInfo expected = new InlongPulsarTopicInfo();
         expected.setInlongGroupId("1");
-        expected.setMqResource("testTopic");
-        expected.setMqType(MQType.TUBEMQ);
-        expected.setPulsarAdminUrl("http://127.0.0.1:8080";);
-        expected.setPulsarServiceUrl("http://127.0.0.1:8081";);
-        expected.setTubeMasterUrl("http://127.0.0.1:8082";);
-        List<InlongStreamBriefInfo> list = new ArrayList<>();
-        expected.setStreamTopics(list);
-        InlongStreamBriefInfo briefInfo = new InlongStreamBriefInfo();
-        briefInfo.setId(1);
-        briefInfo.setInlongGroupId("testgroup");
-        briefInfo.setModifyTime(new Date());
+        expected.setNamespace("testTopic");
+        PulsarClusterInfo clusterInfo = new PulsarClusterInfo();
+        clusterInfo.setUrl("pulsar://127.0.0.1:6650");
+        clusterInfo.setAdminUrl("http://127.0.0.1:8080";);
+        expected.setClusterInfos(Collections.singletonList(clusterInfo));
+        expected.setTopics(new ArrayList<>());
         stubFor(
                 get(urlMatching("/inlong/manager/api/group/getTopic/1.*"))
                         .willReturn(
@@ -497,13 +491,12 @@ class ClientFactoryTest {
                                         Response.success(expected))
                                 ))
         );
-        InlongGroupTopicInfo actual = groupClient.getTopic("1");
+
+        InlongPulsarTopicInfo actual = (InlongPulsarTopicInfo) 
groupClient.getTopic("1");
         Assertions.assertEquals(expected.getInlongGroupId(), 
actual.getInlongGroupId());
         Assertions.assertEquals(expected.getMqType(), actual.getMqType());
-        Assertions.assertEquals(expected.getTubeMasterUrl(), 
actual.getTubeMasterUrl());
-        Assertions.assertEquals(expected.getPulsarAdminUrl(), 
actual.getPulsarAdminUrl());
-        Assertions.assertEquals(expected.getPulsarServiceUrl(), 
actual.getPulsarServiceUrl());
-        Assertions.assertEquals(expected.getStreamTopics(), 
actual.getStreamTopics());
+        Assertions.assertEquals(expected.getTopics().size(), 
actual.getTopics().size());
+        Assertions.assertEquals(expected.getClusterInfos().size(), 
actual.getClusterInfos().size());
     }
 
     @Test
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index 36ca245df..db7ae4a70 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -51,6 +51,8 @@ public interface InlongClusterEntityMapper {
      */
     List<SortSourceClusterInfo> selectAllClusters();
 
+    List<InlongClusterEntity> selectByClusterTag(String clusterTag);
+
     int updateById(InlongClusterEntity record);
 
     int updateByIdSelective(InlongClusterEntity record);
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
index 7edd00626..4e7c1d4ab 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
@@ -38,7 +38,7 @@ public interface InlongGroupExtEntityMapper {
 
     int updateByPrimaryKey(InlongGroupExtEntity record);
 
-    InlongGroupExtEntity selectByGroupIdAndKeyName(String groupId, String 
keyName);
+    InlongGroupExtEntity selectByUniqueKey(@Param("groupId") String groupId, 
@Param("keyName") String keyName);
 
     /**
      * Insert data in batches
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
index f59e9df94..2c7316bf6 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -44,6 +44,9 @@ public interface InlongStreamExtEntityMapper {
 
     List<InlongStreamExtEntity> selectByRelatedId(@Param("groupId") String 
groupId, @Param("streamId") String streamId);
 
+    InlongStreamExtEntity selectByKey(@Param("groupId") String groupId, 
@Param("streamId") String streamId,
+            @Param("keyName") String keyName);
+
     int updateByPrimaryKey(InlongStreamExtEntity record);
 
     /**
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index ca518dad6..d9d7fef62 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -246,5 +246,4 @@
         from inlong_cluster
         where id = #{id,jdbcType=INTEGER}
     </delete>
-
 </mapper>
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
index 7093ccb89..dba70b312 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
@@ -45,7 +45,7 @@
         and is_deleted = 0
     </select>
     <!-- Query the undeleted extended attributes based on inlongGroupId and 
keyName -->
-    <select id="selectByGroupIdAndKeyName" 
resultType="org.apache.inlong.manager.dao.entity.InlongGroupExtEntity">
+    <select id="selectByUniqueKey" 
resultType="org.apache.inlong.manager.dao.entity.InlongGroupExtEntity">
         select
         <include refid="Base_Column_List"/>
         from inlong_group_ext
@@ -53,6 +53,7 @@
         and key_name = #{keyName, jdbcType=VARCHAR}
         and is_deleted = 0
     </select>
+
     <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
         delete
         from inlong_group_ext
diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
index 7e62e6e35..25e4e1a7a 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -78,6 +78,15 @@
         </if>
         and is_deleted = 0
     </select>
+    <select id="selectByKey" 
resultType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_stream_ext
+        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+        and key_name = #{keyName, jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
 
     <update id="updateByPrimaryKey" 
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
         update inlong_stream_ext
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
index e23a2ba78..6817afe72 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
@@ -17,10 +17,11 @@
 
 package org.apache.inlong.manager.pojo.group;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 
 import java.util.List;
 
@@ -29,31 +30,19 @@ import java.util.List;
  */
 @Data
 @ApiModel("Inlong group and topic info")
-public class InlongGroupTopicInfo {
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongGroupTopicInfo {
 
     @ApiModelProperty(value = "Inlong group id", required = true)
     private String inlongGroupId;
 
-    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high 
consistency: PULSAR")
+    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high 
consistency: PULSAR, or KAFKA")
     private String mqType;
 
-    @ApiModelProperty(value = "MQ resource, TubeMQ topic name, or Pulsar 
namespace name")
-    private String mqResource;
+    @ApiModelProperty(value = "Inlong cluster tag of the current InlongGroup")
+    private String inlongClusterTag;
 
-    @ApiModelProperty(value = "Topic list, TubeMQ corresponds to inlong group, 
there is only 1 topic, "
-            + "Pulsar corresponds to inlong stream, there are multiple topics")
-    private List<InlongStreamBriefInfo> streamTopics;
-
-    @ApiModelProperty(value = "TubeMQ master URL")
-    private String tubeMasterUrl;
-
-    @ApiModelProperty(value = "Pulsar service URL")
-    private String pulsarServiceUrl;
-
-    @ApiModelProperty(value = "Pulsar admin URL")
-    private String pulsarAdminUrl;
-
-    @ApiModelProperty(value = "Kafka admin URL")
-    private String kafkaBootstrapServers;
+    @ApiModelProperty(value = "MQ cluster info list")
+    private List<? extends ClusterInfo> clusterInfos;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
new file mode 100644
index 000000000..5fed990ad
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.KAFKA)
+@ApiModel("Inlong kafka group topic info")
+public class InlongKafkaTopicInfo extends InlongGroupTopicInfo {
+
+    @ApiModelProperty(value = "Kafka topics")
+    private List<String> topics;
+
+    public InlongKafkaTopicInfo() {
+        this.setMqType(MQType.KAFKA);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
new file mode 100644
index 000000000..33f8058f5
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.none;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+/**
+ * Inlong group request without MQ.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong group request without MQ")
+@JsonTypeDefine(value = MQType.NONE)
+public class InlongNoneMqTopicInfo extends InlongGroupTopicInfo {
+
+    // no field
+
+    public InlongNoneMqTopicInfo() {
+        this.setMqType(MQType.NONE);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
new file mode 100644
index 000000000..2ba108428
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.PULSAR)
+@ApiModel("Inlong pulsar group topic info")
+public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
+
+    @ApiModelProperty(value = "Pulsar tenant")
+    private String tenant;
+
+    @ApiModelProperty(value = "Pulsar namespace")
+    private String namespace;
+
+    @ApiModelProperty(value = "Pulsar topics")
+    private List<String> topics;
+
+    public InlongPulsarTopicInfo() {
+        this.setMqType(MQType.PULSAR);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
new file mode 100644
index 000000000..de694d5f0
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.tubemq;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong tube group topic info")
+public class InlongTubeMQTopicInfo extends InlongGroupTopicInfo {
+
+    @ApiModelProperty(value = "TubeMQ topic")
+    private String topic;
+
+    public InlongTubeMQTopicInfo() {
+        this.setMqType(MQType.TUBEMQ);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 19ea780fb..de4467181 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -119,6 +119,15 @@ public interface InlongClusterService {
      */
     PageResult<ClusterInfo> list(ClusterPageRequest request);
 
+    /**
+     * List clusters by tag and type
+     *
+     * @param clusterTag cluster tag
+     * @param clusterType cluster type
+     * @return cluster info list
+     */
+    List<ClusterInfo> listByTagAndType(String clusterTag, String clusterType);
+
     /**
      * Update cluster information
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 8edf90ff7..7f92149f0 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -332,6 +332,25 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
         return pageResult;
     }
 
+    @Override
+    public List<ClusterInfo> listByTagAndType(String clusterTag, String 
clusterType) {
+        List<InlongClusterEntity> clusterEntities = 
clusterMapper.selectByKey(clusterTag, null, clusterType);
+        if (CollectionUtils.isEmpty(clusterEntities)) {
+            throw new BusinessException(String.format("cannot find any cluster 
by tag %s and type %s",
+                    clusterTag, clusterType));
+        }
+
+        List<ClusterInfo> clusterInfos = clusterEntities.stream()
+                .map(entity -> {
+                    InlongClusterOperator operator = 
clusterOperatorFactory.getInstance(entity.getType());
+                    return operator.getFromEntity(entity);
+                })
+                .collect(Collectors.toList());
+
+        LOGGER.debug("success to list inlong cluster by tag={}", clusterTag);
+        return clusterInfos;
+    }
+
     @Override
     public ClusterInfo getOne(String clusterTag, String name, String type) {
         List<InlongClusterEntity> entityList = 
clusterMapper.selectByKey(clusterTag, name, type);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 0139dc0ce..7e0c667ef 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -29,7 +29,7 @@ import 
org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.service.group.InlongNoneMqOperator;
+import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Service;
 @Service
 public class TubeClusterOperator extends AbstractClusterOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongNoneMqOperator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupOperator4NoneMQ.class);
 
     @Autowired
     private ObjectMapper objectMapper;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
index d507c2d2c..f0d535efa 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
 import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +65,9 @@ public class ConsumeTubeMQOperator extends 
AbstractConsumeOperator {
         Preconditions.checkNotNull(topicInfo, "inlong group not exist: " + 
groupId);
 
         // one inlong group only has one TubeMQ topic
-        String mqResource = topicInfo.getMqResource();
-        Preconditions.checkTrue(Objects.equals(mqResource, request.getTopic()),
-                String.format("inlong consume topic %s not belongs to inlong 
group %s", request.getTopic(), groupId));
+        InlongTubeMQTopicInfo tubeMQTopic = (InlongTubeMQTopicInfo) topicInfo;
+        Preconditions.checkTrue(Objects.equals(tubeMQTopic.getTopic(), 
request.getTopic()),
+                String.format("topic %s for consume not belongs to inlong 
group %s", request.getTopic(), groupId));
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 469309c05..f64eaa192 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -58,6 +58,7 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
@@ -367,14 +368,15 @@ public class ConsumptionServiceImpl implements 
ConsumptionService {
 
         // Tube’s topic is the inlong group level, one inlong group, one 
TubeMQ topic
         String mqType = topicVO.getMqType();
+        // this class was deprecated, just comment it out
         if (MQType.TUBEMQ.equals(mqType)) {
-            String mqResource = topicVO.getMqResource();
+            String mqResource = /*topicVO.getMqResource()*/ null;
             Preconditions.checkTrue(mqResource == null || 
mqResource.equals(info.getTopic()),
                     "topic [" + info.getTopic() + "] not belong to inlong 
group " + groupId);
         } else if (MQType.PULSAR.equals(mqType) || 
MQType.TDMQ_PULSAR.equals(mqType)) {
             // Pulsar's topic is the inlong stream level.
             // There will be multiple inlong streams under one inlong group, 
and there will be multiple topics
-            List<InlongStreamBriefInfo> streamTopics = 
topicVO.getStreamTopics();
+            List<InlongStreamBriefInfo> streamTopics = 
/*topicVO.getStreamTopics()*/ new ArrayList<>();
             if (streamTopics != null && streamTopics.size() > 0) {
                 Set<String> topicSet = new 
HashSet<>(Arrays.asList(info.getTopic().split(",")));
                 streamTopics.forEach(stream -> 
topicSet.remove(stream.getMqResource()));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index ae325e298..eed0cba50 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -17,17 +17,20 @@
 
 package org.apache.inlong.manager.service.group;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +44,19 @@ public abstract class AbstractGroupOperator implements 
InlongGroupOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractGroupOperator.class);
 
+    @Autowired
+    protected ObjectMapper objectMapper;
+    @Autowired
+    protected InlongStreamService streamService;
+    @Autowired
+    protected InlongClusterService clusterService;
+
     @Autowired
     protected InlongGroupEntityMapper groupMapper;
+    @Autowired
+    protected InlongGroupExtEntityMapper groupExtMapper;
+    @Autowired
+    protected InlongStreamExtEntityMapper streamExtMapper;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -90,13 +104,4 @@ public abstract class AbstractGroupOperator implements 
InlongGroupOperator {
         }
     }
 
-    @Override
-    public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
-        InlongGroupTopicInfo topicInfo = new InlongGroupTopicInfo();
-        topicInfo.setInlongGroupId(groupInfo.getInlongGroupId());
-        topicInfo.setMqType(groupInfo.getMqType());
-        topicInfo.setMqResource(groupInfo.getMqResource());
-        return topicInfo;
-    }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
index 3356771a3..1f1abc814 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
@@ -17,10 +17,10 @@
 
 package org.apache.inlong.manager.service.group;
 
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 
 /**
  * Interface of the inlong group operator.
@@ -72,4 +72,14 @@ public interface InlongGroupOperator {
      */
     InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo);
 
+    /**
+     * Get backup topic info for the given inlong group if exists.
+     *
+     * @param groupInfo inlong group info
+     * @return backup topic info
+     */
+    default InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+        return null;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
similarity index 64%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
index 24807e5d2..83d46cade 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
@@ -17,34 +17,37 @@
 
 package org.apache.inlong.manager.service.group;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaDTO;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaRequest;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaTopicInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+
 /**
  * Inlong group operator for Kafka.
  */
 @Service
-public class InlongKafkaOperator extends AbstractGroupOperator {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongKafkaOperator.class);
+public class InlongGroupOperator4Kafka extends AbstractGroupOperator {
 
-    @Autowired
-    private ObjectMapper objectMapper;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupOperator4Kafka.class);
 
     @Override
     public Boolean accept(String mqType) {
@@ -88,7 +91,35 @@ public class InlongKafkaOperator extends 
AbstractGroupOperator {
 
     @Override
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
-        InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
+        InlongKafkaTopicInfo topicInfo = new InlongKafkaTopicInfo();
+        // each inlong stream is associated with a Kafka topic
+        List<String> topics = 
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
+                .map(InlongStreamBriefInfo::getMqResource)
+                .collect(Collectors.toList());
+        topicInfo.setTopics(topics);
+
+        return topicInfo;
+    }
+
+    @Override
+    public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+        // set backup topics, each inlong stream is associated with a Kafka 
topic
+        String groupId = groupInfo.getInlongGroupId();
+        List<InlongStreamBriefInfo> streamTopics = 
streamService.getTopicList(groupId);
+        streamTopics.forEach(stream -> {
+            InlongStreamExtEntity streamExtEntity = 
streamExtMapper.selectByKey(groupId, stream.getInlongStreamId(),
+                    BACKUP_MQ_RESOURCE);
+            if (streamExtEntity != null && 
StringUtils.isNotBlank(streamExtEntity.getKeyValue())) {
+                stream.setMqResource(streamExtEntity.getKeyValue());
+            }
+        });
+
+        InlongKafkaTopicInfo topicInfo = new InlongKafkaTopicInfo();
+        List<String> topics = streamTopics.stream()
+                .map(InlongStreamBriefInfo::getMqResource)
+                .collect(Collectors.toList());
+        topicInfo.setTopics(topics);
+
         return topicInfo;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
similarity index 95%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
index 966156ac6..16114a293 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
@@ -34,9 +34,9 @@ import org.springframework.stereotype.Service;
  * Inlong group operator without MQ.
  */
 @Service
-public class InlongNoneMqOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4NoneMQ extends AbstractGroupOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongNoneMqOperator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupOperator4NoneMQ.class);
 
     @Override
     public Boolean accept(String mqType) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
similarity index 67%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index d00fe64b4..dc09f2be3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -17,41 +17,39 @@
 
 package org.apache.inlong.manager.service.group;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
 
 /**
  * Inlong group operator for Pulsar.
  */
 @Service
-public class InlongPulsarOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongPulsarOperator.class);
-
-    @Autowired
-    private ObjectMapper objectMapper;
-    @Autowired
-    private InlongStreamService streamService;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupOperator4Pulsar.class);
 
     @Override
     public Boolean accept(String mqType) {
@@ -112,14 +110,43 @@ public class InlongPulsarOperator extends 
AbstractGroupOperator {
 
     @Override
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
-        InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
-        // Pulsar topic corresponds to the inlong stream one-to-one
-        List<InlongStreamBriefInfo> streamTopics = 
streamService.getTopicList(groupInfo.getInlongGroupId());
-        topicInfo.setStreamTopics(streamTopics);
-        // TODO add cache for cluster info, and support extends different MQs
-        // topicInfo.setTenant();
-        // topicInfo.setAdminUrl();
-        // topicInfo.setServiceUrl();
+        InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+        topicInfo.setNamespace(groupInfo.getMqResource());
+        // each inlong stream is associated with a Pulsar topic
+        List<String> topics = 
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
+                .map(InlongStreamBriefInfo::getMqResource)
+                .collect(Collectors.toList());
+        topicInfo.setTopics(topics);
+
+        return topicInfo;
+    }
+
+    @Override
+    public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+        // set backup namespace
+        String groupId = groupInfo.getInlongGroupId();
+        InlongGroupExtEntity extEntity = 
groupExtMapper.selectByUniqueKey(groupId, BACKUP_MQ_RESOURCE);
+        InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+        if (extEntity != null && 
StringUtils.isNotBlank(extEntity.getKeyValue())) {
+            topicInfo.setNamespace(extEntity.getKeyValue());
+        } else {
+            topicInfo.setNamespace(groupInfo.getMqResource());
+        }
+
+        // set backup topics, each inlong stream is associated with a Pulsar 
topic
+        List<InlongStreamBriefInfo> streamTopics = 
streamService.getTopicList(groupId);
+        streamTopics.forEach(stream -> {
+            InlongStreamExtEntity streamExtEntity = 
streamExtMapper.selectByKey(groupId, stream.getInlongStreamId(),
+                    BACKUP_MQ_RESOURCE);
+            if (streamExtEntity != null && 
StringUtils.isNotBlank(streamExtEntity.getKeyValue())) {
+                stream.setMqResource(streamExtEntity.getKeyValue());
+            }
+        });
+        List<String> topics = streamTopics.stream()
+                .map(InlongStreamBriefInfo::getMqResource)
+                .collect(Collectors.toList());
+        topicInfo.setTopics(topics);
+
         return topicInfo;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
similarity index 67%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
index 660c6527d..b39347410 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
@@ -17,26 +17,31 @@
 
 package org.apache.inlong.manager.service.group;
 
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
+import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+
 /**
  * Inlong group operator for TubeMQ.
  */
 @Service
-public class InlongTubeOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4TubeMQ extends AbstractGroupOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongTubeOperator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupOperator4TubeMQ.class);
 
     @Override
     public Boolean accept(String mqType) {
@@ -69,9 +74,25 @@ public class InlongTubeOperator extends 
AbstractGroupOperator {
 
     @Override
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
-        // TODO add cache for cluster info
-        // topicInfo.setTubeMasterUrl(groupInfo.getMqType());
-        return super.getTopic(groupInfo);
+        InlongTubeMQTopicInfo topicInfo = new InlongTubeMQTopicInfo();
+        // each inlong group is associated with a TubeMQ topic
+        topicInfo.setTopic(groupInfo.getMqResource());
+        return topicInfo;
+    }
+
+    @Override
+    public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+        // set backup topic, each inlong group is associated with a TubeMQ 
topic
+        InlongTubeMQTopicInfo topicInfo = new InlongTubeMQTopicInfo();
+        InlongGroupExtEntity extEntity = 
groupExtMapper.selectByUniqueKey(groupInfo.getInlongGroupId(),
+                BACKUP_MQ_RESOURCE);
+        if (extEntity != null && 
StringUtils.isNotBlank(extEntity.getKeyValue())) {
+            topicInfo.setTopic(extEntity.getKeyValue());
+        } else {
+            topicInfo.setTopic(groupInfo.getMqResource());
+        }
+
+        return topicInfo;
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 48dccc0f5..3c07faa28 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -109,12 +109,19 @@ public interface InlongGroupService {
     /**
      * According to the group id, query the topic to which it belongs
      *
-     * @param groupId Inlong group id
-     * @return Topic information
-     * @apiNote TubeMQ corresponds to the group, only 1 topic
+     * @param groupId inlong group id
+     * @return topic info
      */
     InlongGroupTopicInfo getTopic(String groupId);
 
+    /**
+     * According to the group id, query the backup topic to which it belongs
+     *
+     * @param groupId inlong group id
+     * @return backup topic info
+     */
+    InlongGroupTopicInfo getBackupTopic(String groupId);
+
     /**
      * Save the group modified when the approval is passed
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 150c8e317..ddcb6f0c5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -40,6 +40,7 @@ import 
org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -56,6 +57,7 @@ import 
org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
 import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
 import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
@@ -77,6 +79,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG;
 import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
 
 /**
@@ -88,18 +91,21 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupServiceImpl.class);
 
-    @Autowired
-    private InlongGroupOperatorFactory groupOperatorFactory;
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
     private InlongGroupExtEntityMapper groupExtMapper;
     @Autowired
+    private InlongStreamService streamService;
+    @Autowired
     private StreamSourceEntityMapper streamSourceMapper;
     @Autowired
-    private SourceOperatorFactory sourceOperatorFactory;
+    private InlongClusterService clusterService;
+
     @Autowired
-    private InlongStreamService streamService;
+    private InlongGroupOperatorFactory groupOperatorFactory;
+    @Autowired
+    private SourceOperatorFactory sourceOperatorFactory;
 
     /**
      * Check whether modification is supported under the current group status, 
and which fields can be modified.
@@ -363,16 +369,50 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     public InlongGroupTopicInfo getTopic(String groupId) {
         // the group info will not null in get() method
         InlongGroupInfo groupInfo = this.get(groupId);
+        InlongGroupOperator groupOperator = 
groupOperatorFactory.getInstance(groupInfo.getMqType());
+        InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
 
-        InlongGroupOperator instance = 
groupOperatorFactory.getInstance(groupInfo.getMqType());
-        InlongGroupTopicInfo topicInfo = instance.getTopic(groupInfo);
+        // set the base params
+        topicInfo.setInlongGroupId(groupId);
+        String clusterTag = groupInfo.getInlongClusterTag();
+        topicInfo.setInlongClusterTag(clusterTag);
 
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("success to get topic for groupId={}, result=" + 
topicInfo, groupId);
-        }
+        // assert: each MQ type has a corresponding type of cluster
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
+        topicInfo.setClusterInfos(clusterInfos);
+
+        LOGGER.debug("success to get topic for groupId={}, result={}", 
groupId, topicInfo);
         return topicInfo;
     }
 
+    @Override
+    public InlongGroupTopicInfo getBackupTopic(String groupId) {
+        // backup topic info saved in the ext table
+        InlongGroupExtEntity extEntity = 
groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
+        if (StringUtils.isBlank(extEntity.getKeyValue())) {
+            LOGGER.warn("not found any backup topic for groupId={}", groupId);
+            return null;
+        }
+
+        // the group info will not null in get() method
+        InlongGroupInfo groupInfo = this.get(groupId);
+        InlongGroupOperator groupOperator = 
groupOperatorFactory.getInstance(groupInfo.getMqType());
+        InlongGroupTopicInfo backupTopicInfo = 
groupOperator.getBackupTopic(groupInfo);
+
+        // set the base params
+        backupTopicInfo.setInlongGroupId(groupId);
+        String backupClusterTag = extEntity.getKeyValue();
+        backupTopicInfo.setInlongClusterTag(backupClusterTag);
+
+        // set backup cluster info
+        // assert: each MQ type has a corresponding type of cluster
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
+        backupTopicInfo.setClusterInfos(clusterInfos);
+
+        LOGGER.debug("success to get backup topic for groupId={}, result={}", 
groupId, backupTopicInfo);
+        return backupTopicInfo;
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
     public void updateAfterApprove(InlongGroupApproveRequest approveRequest, 
String operator) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 87775fb29..2afa702ca 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -427,7 +427,7 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         Preconditions.checkNotNull(groupId, 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         List<InlongStreamBriefInfo> topicList = 
streamMapper.selectBriefList(groupId);
-        LOGGER.debug("success to get topic list by groupId={}", groupId);
+        LOGGER.debug("success to get topic list by groupId={}, result 
size={}", groupId, topicList.size());
         return topicList;
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 8f105e10d..959d005e4 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -39,6 +39,7 @@ import 
org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.user.LoginUserUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -104,6 +105,18 @@ public class InlongGroupController {
         return Response.success(groupService.countGroupByUser(operator));
     }
 
+    @GetMapping(value = "/group/getTopic/{groupId}")
+    @ApiOperation(value = "Get topic info")
+    public Response<InlongGroupTopicInfo> getTopic(@PathVariable String 
groupId) {
+        return Response.success(groupService.getTopic(groupId));
+    }
+
+    @GetMapping(value = "/group/getBackupTopic/{groupId}")
+    @ApiOperation(value = "Get backup topic info")
+    public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String 
groupId) {
+        return Response.success(groupService.getBackupTopic(groupId));
+    }
+
     @RequestMapping(value = "/group/startProcess/{groupId}", method = 
RequestMethod.POST)
     @ApiOperation(value = "Start inlong approval process")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
@@ -162,12 +175,6 @@ public class InlongGroupController {
         return 
Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
     }
 
-    @RequestMapping(value = "/group/getTopic/{groupId}", method = 
RequestMethod.GET)
-    @ApiOperation(value = "Get topic info")
-    public Response<InlongGroupTopicInfo> getTopic(@PathVariable String 
groupId) {
-        return Response.success(groupService.getTopic(groupId));
-    }
-
     @PostMapping(value = "/group/reset")
     @ApiOperation(value = "Reset group status when group is in 
CONFIG_ING|SUSPENDING|RESTARTING|DELETING")
     public Response<Boolean> reset(@RequestBody @Validated 
InlongGroupResetRequest request) {

Reply via email to