This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d8cdae17d [INLONG-6216][Manager] Support getting group topic info
(#6230)
d8cdae17d is described below
commit d8cdae17d8ce1f31c2996eb34210f928c9de3465
Author: vernedeng <[email protected]>
AuthorDate: Tue Oct 25 09:40:25 2022 +0800
[INLONG-6216][Manager] Support getting group topic info (#6230)
* Support getting group topic info
* Refactor the getTopic API, add getBackupTopic API
Co-authored-by: healchow <[email protected]>
---
.../inlong/common/constant/ClusterSwitch.java | 34 +++++++++++
.../client/api/inner/ClientFactoryTest.java | 33 ++++------
.../dao/mapper/InlongClusterEntityMapper.java | 2 +
.../dao/mapper/InlongGroupExtEntityMapper.java | 2 +-
.../dao/mapper/InlongStreamExtEntityMapper.java | 3 +
.../mappers/InlongClusterEntityMapper.xml | 1 -
.../mappers/InlongGroupExtEntityMapper.xml | 3 +-
.../mappers/InlongStreamExtEntityMapper.xml | 9 +++
.../manager/pojo/group/InlongGroupTopicInfo.java | 29 +++------
.../pojo/group/kafka/InlongKafkaTopicInfo.java | 47 ++++++++++++++
.../pojo/group/none/InlongNoneMqTopicInfo.java | 44 ++++++++++++++
.../pojo/group/pulsar/InlongPulsarTopicInfo.java | 53 ++++++++++++++++
.../pojo/group/tubemq/InlongTubeMQTopicInfo.java | 45 ++++++++++++++
.../service/cluster/InlongClusterService.java | 9 +++
.../service/cluster/InlongClusterServiceImpl.java | 19 ++++++
.../service/cluster/TubeClusterOperator.java | 4 +-
.../service/consume/ConsumeTubeMQOperator.java | 7 ++-
.../service/core/impl/ConsumptionServiceImpl.java | 6 +-
.../service/group/AbstractGroupOperator.java | 29 +++++----
.../manager/service/group/InlongGroupOperator.java | 12 +++-
...perator.java => InlongGroupOperator4Kafka.java} | 47 +++++++++++---
...erator.java => InlongGroupOperator4NoneMQ.java} | 4 +-
...erator.java => InlongGroupOperator4Pulsar.java} | 71 +++++++++++++++-------
...erator.java => InlongGroupOperator4TubeMQ.java} | 33 ++++++++--
.../manager/service/group/InlongGroupService.java | 13 +++-
.../service/group/InlongGroupServiceImpl.java | 58 +++++++++++++++---
.../service/stream/InlongStreamServiceImpl.java | 2 +-
.../web/controller/InlongGroupController.java | 19 ++++--
28 files changed, 518 insertions(+), 120 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
new file mode 100644
index 000000000..90816730c
--- /dev/null
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/ClusterSwitch.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.constant;
+
+/**
+ * Constants of cluster switching
+ */
+public class ClusterSwitch {
+
+ /**
+ * Cluster tag for backup.
+ */
+ public static final String BACKUP_CLUSTER_TAG = "backup_cluster_tag";
+
+ /**
+ * MQ resource for backup, represents the namespace of Pulsar, the topic
of TubeMQ, etc.
+ */
+ public static final String BACKUP_MQ_RESOURCE = "backup_mq_resource";
+}
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 91f28c836..215218ef5 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -39,7 +39,6 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.auth.TokenAuthentication;
import org.apache.inlong.manager.common.consts.DataNodeType;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -61,9 +60,9 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupCountResponse;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
@@ -82,7 +81,6 @@ import
org.apache.inlong.manager.pojo.source.autopush.AutoPushSource;
import org.apache.inlong.manager.pojo.source.file.FileSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamResponse;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -94,6 +92,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -477,19 +476,14 @@ class ClientFactoryTest {
@Test
void getTopic() {
- InlongGroupTopicInfo expected = new InlongGroupTopicInfo();
+ InlongPulsarTopicInfo expected = new InlongPulsarTopicInfo();
expected.setInlongGroupId("1");
- expected.setMqResource("testTopic");
- expected.setMqType(MQType.TUBEMQ);
- expected.setPulsarAdminUrl("http://127.0.0.1:8080");
- expected.setPulsarServiceUrl("http://127.0.0.1:8081");
- expected.setTubeMasterUrl("http://127.0.0.1:8082");
- List<InlongStreamBriefInfo> list = new ArrayList<>();
- expected.setStreamTopics(list);
- InlongStreamBriefInfo briefInfo = new InlongStreamBriefInfo();
- briefInfo.setId(1);
- briefInfo.setInlongGroupId("testgroup");
- briefInfo.setModifyTime(new Date());
+ expected.setNamespace("testTopic");
+ PulsarClusterInfo clusterInfo = new PulsarClusterInfo();
+ clusterInfo.setUrl("pulsar://127.0.0.1:6650");
+ clusterInfo.setAdminUrl("http://127.0.0.1:8080");
+ expected.setClusterInfos(Collections.singletonList(clusterInfo));
+ expected.setTopics(new ArrayList<>());
stubFor(
get(urlMatching("/inlong/manager/api/group/getTopic/1.*"))
.willReturn(
@@ -497,13 +491,12 @@ class ClientFactoryTest {
Response.success(expected))
))
);
- InlongGroupTopicInfo actual = groupClient.getTopic("1");
+
+ InlongPulsarTopicInfo actual = (InlongPulsarTopicInfo)
groupClient.getTopic("1");
Assertions.assertEquals(expected.getInlongGroupId(),
actual.getInlongGroupId());
Assertions.assertEquals(expected.getMqType(), actual.getMqType());
- Assertions.assertEquals(expected.getTubeMasterUrl(),
actual.getTubeMasterUrl());
- Assertions.assertEquals(expected.getPulsarAdminUrl(),
actual.getPulsarAdminUrl());
- Assertions.assertEquals(expected.getPulsarServiceUrl(),
actual.getPulsarServiceUrl());
- Assertions.assertEquals(expected.getStreamTopics(),
actual.getStreamTopics());
+ Assertions.assertEquals(expected.getTopics().size(),
actual.getTopics().size());
+ Assertions.assertEquals(expected.getClusterInfos().size(),
actual.getClusterInfos().size());
}
@Test
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
index 36ca245df..db7ae4a70 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterEntityMapper.java
@@ -51,6 +51,8 @@ public interface InlongClusterEntityMapper {
*/
List<SortSourceClusterInfo> selectAllClusters();
+ List<InlongClusterEntity> selectByClusterTag(String clusterTag);
+
int updateById(InlongClusterEntity record);
int updateByIdSelective(InlongClusterEntity record);
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
index 7edd00626..4e7c1d4ab 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupExtEntityMapper.java
@@ -38,7 +38,7 @@ public interface InlongGroupExtEntityMapper {
int updateByPrimaryKey(InlongGroupExtEntity record);
- InlongGroupExtEntity selectByGroupIdAndKeyName(String groupId, String
keyName);
+ InlongGroupExtEntity selectByUniqueKey(@Param("groupId") String groupId,
@Param("keyName") String keyName);
/**
* Insert data in batches
diff --git
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
index f59e9df94..2c7316bf6 100644
---
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
+++
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongStreamExtEntityMapper.java
@@ -44,6 +44,9 @@ public interface InlongStreamExtEntityMapper {
List<InlongStreamExtEntity> selectByRelatedId(@Param("groupId") String
groupId, @Param("streamId") String streamId);
+ InlongStreamExtEntity selectByKey(@Param("groupId") String groupId,
@Param("streamId") String streamId,
+ @Param("keyName") String keyName);
+
int updateByPrimaryKey(InlongStreamExtEntity record);
/**
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
index ca518dad6..d9d7fef62 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterEntityMapper.xml
@@ -246,5 +246,4 @@
from inlong_cluster
where id = #{id,jdbcType=INTEGER}
</delete>
-
</mapper>
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
index 7093ccb89..dba70b312 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupExtEntityMapper.xml
@@ -45,7 +45,7 @@
and is_deleted = 0
</select>
<!-- Query the undeleted extended attributes based on inlongGroupId and
keyName -->
- <select id="selectByGroupIdAndKeyName"
resultType="org.apache.inlong.manager.dao.entity.InlongGroupExtEntity">
+ <select id="selectByUniqueKey"
resultType="org.apache.inlong.manager.dao.entity.InlongGroupExtEntity">
select
<include refid="Base_Column_List"/>
from inlong_group_ext
@@ -53,6 +53,7 @@
and key_name = #{keyName, jdbcType=VARCHAR}
and is_deleted = 0
</select>
+
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
delete
from inlong_group_ext
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
index 7e62e6e35..25e4e1a7a 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamExtEntityMapper.xml
@@ -78,6 +78,15 @@
</if>
and is_deleted = 0
</select>
+ <select id="selectByKey"
resultType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_stream_ext
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and key_name = #{keyName, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
<update id="updateByPrimaryKey"
parameterType="org.apache.inlong.manager.dao.entity.InlongStreamExtEntity">
update inlong_stream_ext
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
index e23a2ba78..6817afe72 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
@@ -17,10 +17,11 @@
package org.apache.inlong.manager.pojo.group;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import java.util.List;
@@ -29,31 +30,19 @@ import java.util.List;
*/
@Data
@ApiModel("Inlong group and topic info")
-public class InlongGroupTopicInfo {
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongGroupTopicInfo {
@ApiModelProperty(value = "Inlong group id", required = true)
private String inlongGroupId;
- @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high
consistency: PULSAR")
+ @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high
consistency: PULSAR, or KAFKA")
private String mqType;
- @ApiModelProperty(value = "MQ resource, TubeMQ topic name, or Pulsar
namespace name")
- private String mqResource;
+ @ApiModelProperty(value = "Inlong cluster tag of the current InlongGroup")
+ private String inlongClusterTag;
- @ApiModelProperty(value = "Topic list, TubeMQ corresponds to inlong group,
there is only 1 topic, "
- + "Pulsar corresponds to inlong stream, there are multiple topics")
- private List<InlongStreamBriefInfo> streamTopics;
-
- @ApiModelProperty(value = "TubeMQ master URL")
- private String tubeMasterUrl;
-
- @ApiModelProperty(value = "Pulsar service URL")
- private String pulsarServiceUrl;
-
- @ApiModelProperty(value = "Pulsar admin URL")
- private String pulsarAdminUrl;
-
- @ApiModelProperty(value = "Kafka admin URL")
- private String kafkaBootstrapServers;
+ @ApiModelProperty(value = "MQ cluster info list")
+ private List<? extends ClusterInfo> clusterInfos;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
new file mode 100644
index 000000000..5fed990ad
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaTopicInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.KAFKA)
+@ApiModel("Inlong kafka group topic info")
+public class InlongKafkaTopicInfo extends InlongGroupTopicInfo {
+
+ @ApiModelProperty(value = "Kafka topics")
+ private List<String> topics;
+
+ public InlongKafkaTopicInfo() {
+ this.setMqType(MQType.KAFKA);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
new file mode 100644
index 000000000..33f8058f5
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/none/InlongNoneMqTopicInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.none;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+/**
+ * Inlong group request without MQ.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong group request without MQ")
+@JsonTypeDefine(value = MQType.NONE)
+public class InlongNoneMqTopicInfo extends InlongGroupTopicInfo {
+
+ // no field
+
+ public InlongNoneMqTopicInfo() {
+ this.setMqType(MQType.NONE);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
new file mode 100644
index 000000000..2ba108428
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarTopicInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+import java.util.List;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.PULSAR)
+@ApiModel("Inlong pulsar group topic info")
+public class InlongPulsarTopicInfo extends InlongGroupTopicInfo {
+
+ @ApiModelProperty(value = "Pulsar tenant")
+ private String tenant;
+
+ @ApiModelProperty(value = "Pulsar namespace")
+ private String namespace;
+
+ @ApiModelProperty(value = "Pulsar topics")
+ private List<String> topics;
+
+ public InlongPulsarTopicInfo() {
+ this.setMqType(MQType.PULSAR);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
new file mode 100644
index 000000000..de694d5f0
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQTopicInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.tubemq;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+
+@Data
+@Builder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong tube group topic info")
+public class InlongTubeMQTopicInfo extends InlongGroupTopicInfo {
+
+ @ApiModelProperty(value = "TubeMQ topic")
+ private String topic;
+
+ public InlongTubeMQTopicInfo() {
+ this.setMqType(MQType.TUBEMQ);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 19ea780fb..de4467181 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -119,6 +119,15 @@ public interface InlongClusterService {
*/
PageResult<ClusterInfo> list(ClusterPageRequest request);
+ /**
+ * List clusters by tag and type
+ *
+ * @param clusterTag cluster tag
+ * @param clusterType cluster type
+ * @return cluster info list
+ */
+ List<ClusterInfo> listByTagAndType(String clusterTag, String clusterType);
+
/**
* Update cluster information
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 8edf90ff7..7f92149f0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -332,6 +332,25 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return pageResult;
}
+ @Override
+ public List<ClusterInfo> listByTagAndType(String clusterTag, String
clusterType) {
+ List<InlongClusterEntity> clusterEntities =
clusterMapper.selectByKey(clusterTag, null, clusterType);
+ if (CollectionUtils.isEmpty(clusterEntities)) {
+ throw new BusinessException(String.format("cannot find any cluster
by tag %s and type %s",
+ clusterTag, clusterType));
+ }
+
+ List<ClusterInfo> clusterInfos = clusterEntities.stream()
+ .map(entity -> {
+ InlongClusterOperator operator =
clusterOperatorFactory.getInstance(entity.getType());
+ return operator.getFromEntity(entity);
+ })
+ .collect(Collectors.toList());
+
+ LOGGER.debug("success to list inlong cluster by tag={}", clusterTag);
+ return clusterInfos;
+ }
+
@Override
public ClusterInfo getOne(String clusterTag, String name, String type) {
List<InlongClusterEntity> entityList =
clusterMapper.selectByKey(clusterTag, name, type);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 0139dc0ce..7e0c667ef 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -29,7 +29,7 @@ import
org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.service.group.InlongNoneMqOperator;
+import org.apache.inlong.manager.service.group.InlongGroupOperator4NoneMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +41,7 @@ import org.springframework.stereotype.Service;
@Service
public class TubeClusterOperator extends AbstractClusterOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InlongNoneMqOperator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupOperator4NoneMQ.class);
@Autowired
private ObjectMapper objectMapper;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
index d507c2d2c..f0d535efa 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
@@ -28,6 +28,7 @@ import
org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,9 +65,9 @@ public class ConsumeTubeMQOperator extends
AbstractConsumeOperator {
Preconditions.checkNotNull(topicInfo, "inlong group not exist: " +
groupId);
// one inlong group only has one TubeMQ topic
- String mqResource = topicInfo.getMqResource();
- Preconditions.checkTrue(Objects.equals(mqResource, request.getTopic()),
- String.format("inlong consume topic %s not belongs to inlong
group %s", request.getTopic(), groupId));
+ InlongTubeMQTopicInfo tubeMQTopic = (InlongTubeMQTopicInfo) topicInfo;
+ Preconditions.checkTrue(Objects.equals(tubeMQTopic.getTopic(),
request.getTopic()),
+ String.format("topic %s for consume not belongs to inlong
group %s", request.getTopic(), groupId));
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 469309c05..f64eaa192 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -58,6 +58,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
@@ -367,14 +368,15 @@ public class ConsumptionServiceImpl implements
ConsumptionService {
// Tube’s topic is the inlong group level, one inlong group, one
TubeMQ topic
String mqType = topicVO.getMqType();
+ // this class was deprecated, just comment it out
if (MQType.TUBEMQ.equals(mqType)) {
- String mqResource = topicVO.getMqResource();
+ String mqResource = /*topicVO.getMqResource()*/ null;
Preconditions.checkTrue(mqResource == null ||
mqResource.equals(info.getTopic()),
"topic [" + info.getTopic() + "] not belong to inlong
group " + groupId);
} else if (MQType.PULSAR.equals(mqType) ||
MQType.TDMQ_PULSAR.equals(mqType)) {
// Pulsar's topic is the inlong stream level.
// There will be multiple inlong streams under one inlong group,
and there will be multiple topics
- List<InlongStreamBriefInfo> streamTopics =
topicVO.getStreamTopics();
+ List<InlongStreamBriefInfo> streamTopics =
/*topicVO.getStreamTopics()*/ new ArrayList<>();
if (streamTopics != null && streamTopics.size() > 0) {
Set<String> topicSet = new
HashSet<>(Arrays.asList(info.getTopic().split(",")));
streamTopics.forEach(stream ->
topicSet.remove(stream.getMqResource()));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
index ae325e298..eed0cba50 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/AbstractGroupOperator.java
@@ -17,17 +17,20 @@
package org.apache.inlong.manager.service.group;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +44,19 @@ public abstract class AbstractGroupOperator implements
InlongGroupOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractGroupOperator.class);
+ @Autowired
+ protected ObjectMapper objectMapper;
+ @Autowired
+ protected InlongStreamService streamService;
+ @Autowired
+ protected InlongClusterService clusterService;
+
@Autowired
protected InlongGroupEntityMapper groupMapper;
+ @Autowired
+ protected InlongGroupExtEntityMapper groupExtMapper;
+ @Autowired
+ protected InlongStreamExtEntityMapper streamExtMapper;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -90,13 +104,4 @@ public abstract class AbstractGroupOperator implements
InlongGroupOperator {
}
}
- @Override
- public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
- InlongGroupTopicInfo topicInfo = new InlongGroupTopicInfo();
- topicInfo.setInlongGroupId(groupInfo.getInlongGroupId());
- topicInfo.setMqType(groupInfo.getMqType());
- topicInfo.setMqResource(groupInfo.getMqResource());
- return topicInfo;
- }
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
index 3356771a3..1f1abc814 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.service.group;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
/**
* Interface of the inlong group operator.
@@ -72,4 +72,14 @@ public interface InlongGroupOperator {
*/
InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo);
+ /**
+ * Get backup topic info for the given inlong group if exists.
+ *
+ * @param groupInfo inlong group info
+ * @return backup topic info
+ */
+ default InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+ return null;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
similarity index 64%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
index 24807e5d2..83d46cade 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Kafka.java
@@ -17,34 +17,37 @@
package org.apache.inlong.manager.service.group;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaDTO;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaRequest;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaTopicInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+
/**
* Inlong group operator for Kafka.
*/
@Service
-public class InlongKafkaOperator extends AbstractGroupOperator {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(InlongKafkaOperator.class);
+public class InlongGroupOperator4Kafka extends AbstractGroupOperator {
- @Autowired
- private ObjectMapper objectMapper;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupOperator4Kafka.class);
@Override
public Boolean accept(String mqType) {
@@ -88,7 +91,35 @@ public class InlongKafkaOperator extends
AbstractGroupOperator {
@Override
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
- InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
+ InlongKafkaTopicInfo topicInfo = new InlongKafkaTopicInfo();
+ // each inlong stream is associated with a Kafka topic
+ List<String> topics =
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
+ .map(InlongStreamBriefInfo::getMqResource)
+ .collect(Collectors.toList());
+ topicInfo.setTopics(topics);
+
+ return topicInfo;
+ }
+
+ @Override
+ public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+ // set backup topics, each inlong stream is associated with a Kafka
topic
+ String groupId = groupInfo.getInlongGroupId();
+ List<InlongStreamBriefInfo> streamTopics =
streamService.getTopicList(groupId);
+ streamTopics.forEach(stream -> {
+ InlongStreamExtEntity streamExtEntity =
streamExtMapper.selectByKey(groupId, stream.getInlongStreamId(),
+ BACKUP_MQ_RESOURCE);
+ if (streamExtEntity != null &&
StringUtils.isNotBlank(streamExtEntity.getKeyValue())) {
+ stream.setMqResource(streamExtEntity.getKeyValue());
+ }
+ });
+
+ InlongKafkaTopicInfo topicInfo = new InlongKafkaTopicInfo();
+ List<String> topics = streamTopics.stream()
+ .map(InlongStreamBriefInfo::getMqResource)
+ .collect(Collectors.toList());
+ topicInfo.setTopics(topics);
+
return topicInfo;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
similarity index 95%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
index 966156ac6..16114a293 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongNoneMqOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4NoneMQ.java
@@ -34,9 +34,9 @@ import org.springframework.stereotype.Service;
* Inlong group operator without MQ.
*/
@Service
-public class InlongNoneMqOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4NoneMQ extends AbstractGroupOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InlongNoneMqOperator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupOperator4NoneMQ.class);
@Override
public Boolean accept(String mqType) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
similarity index 67%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index d00fe64b4..dc09f2be3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -17,41 +17,39 @@
package org.apache.inlong.manager.service.group;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
/**
* Inlong group operator for Pulsar.
*/
@Service
-public class InlongPulsarOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InlongPulsarOperator.class);
-
- @Autowired
- private ObjectMapper objectMapper;
- @Autowired
- private InlongStreamService streamService;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupOperator4Pulsar.class);
@Override
public Boolean accept(String mqType) {
@@ -112,14 +110,43 @@ public class InlongPulsarOperator extends
AbstractGroupOperator {
@Override
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
- InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
- // Pulsar topic corresponds to the inlong stream one-to-one
- List<InlongStreamBriefInfo> streamTopics =
streamService.getTopicList(groupInfo.getInlongGroupId());
- topicInfo.setStreamTopics(streamTopics);
- // TODO add cache for cluster info, and support extends different MQs
- // topicInfo.setTenant();
- // topicInfo.setAdminUrl();
- // topicInfo.setServiceUrl();
+ InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+ topicInfo.setNamespace(groupInfo.getMqResource());
+ // each inlong stream is associated with a Pulsar topic
+ List<String> topics =
streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
+ .map(InlongStreamBriefInfo::getMqResource)
+ .collect(Collectors.toList());
+ topicInfo.setTopics(topics);
+
+ return topicInfo;
+ }
+
+ @Override
+ public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+ // set backup namespace
+ String groupId = groupInfo.getInlongGroupId();
+ InlongGroupExtEntity extEntity =
groupExtMapper.selectByUniqueKey(groupId, BACKUP_MQ_RESOURCE);
+ InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+ if (extEntity != null &&
StringUtils.isNotBlank(extEntity.getKeyValue())) {
+ topicInfo.setNamespace(extEntity.getKeyValue());
+ } else {
+ topicInfo.setNamespace(groupInfo.getMqResource());
+ }
+
+ // set backup topics, each inlong stream is associated with a Pulsar
topic
+ List<InlongStreamBriefInfo> streamTopics =
streamService.getTopicList(groupId);
+ streamTopics.forEach(stream -> {
+ InlongStreamExtEntity streamExtEntity =
streamExtMapper.selectByKey(groupId, stream.getInlongStreamId(),
+ BACKUP_MQ_RESOURCE);
+ if (streamExtEntity != null &&
StringUtils.isNotBlank(streamExtEntity.getKeyValue())) {
+ stream.setMqResource(streamExtEntity.getKeyValue());
+ }
+ });
+ List<String> topics = streamTopics.stream()
+ .map(InlongStreamBriefInfo::getMqResource)
+ .collect(Collectors.toList());
+ topicInfo.setTopics(topics);
+
return topicInfo;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
similarity index 67%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
index 660c6527d..b39347410 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongTubeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4TubeMQ.java
@@ -17,26 +17,31 @@
package org.apache.inlong.manager.service.group;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQTopicInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
+import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_MQ_RESOURCE;
+
/**
* Inlong group operator for TubeMQ.
*/
@Service
-public class InlongTubeOperator extends AbstractGroupOperator {
+public class InlongGroupOperator4TubeMQ extends AbstractGroupOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InlongTubeOperator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupOperator4TubeMQ.class);
@Override
public Boolean accept(String mqType) {
@@ -69,9 +74,25 @@ public class InlongTubeOperator extends
AbstractGroupOperator {
@Override
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
- // TODO add cache for cluster info
- // topicInfo.setTubeMasterUrl(groupInfo.getMqType());
- return super.getTopic(groupInfo);
+ InlongTubeMQTopicInfo topicInfo = new InlongTubeMQTopicInfo();
+ // each inlong group is associated with a TubeMQ topic
+ topicInfo.setTopic(groupInfo.getMqResource());
+ return topicInfo;
+ }
+
+ @Override
+ public InlongGroupTopicInfo getBackupTopic(InlongGroupInfo groupInfo) {
+ // set backup topic, each inlong group is associated with a TubeMQ
topic
+ InlongTubeMQTopicInfo topicInfo = new InlongTubeMQTopicInfo();
+ InlongGroupExtEntity extEntity =
groupExtMapper.selectByUniqueKey(groupInfo.getInlongGroupId(),
+ BACKUP_MQ_RESOURCE);
+ if (extEntity != null &&
StringUtils.isNotBlank(extEntity.getKeyValue())) {
+ topicInfo.setTopic(extEntity.getKeyValue());
+ } else {
+ topicInfo.setTopic(groupInfo.getMqResource());
+ }
+
+ return topicInfo;
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 48dccc0f5..3c07faa28 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -109,12 +109,19 @@ public interface InlongGroupService {
/**
* According to the group id, query the topic to which it belongs
*
- * @param groupId Inlong group id
- * @return Topic information
- * @apiNote TubeMQ corresponds to the group, only 1 topic
+ * @param groupId inlong group id
+ * @return topic info
*/
InlongGroupTopicInfo getTopic(String groupId);
+ /**
+ * According to the group id, query the backup topic to which it belongs
+ *
+ * @param groupId inlong group id
+ * @return backup topic info
+ */
+ InlongGroupTopicInfo getBackupTopic(String groupId);
+
/**
* Save the group modified when the approval is passed
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 150c8e317..ddcb6f0c5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -40,6 +40,7 @@ import
org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -56,6 +57,7 @@ import
org.apache.inlong.manager.pojo.sort.BaseSortConf.SortType;
import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.pojo.sort.UserDefinedSortConf;
import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
@@ -77,6 +79,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.inlong.common.constant.ClusterSwitch.BACKUP_CLUSTER_TAG;
import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
/**
@@ -88,18 +91,21 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupServiceImpl.class);
- @Autowired
- private InlongGroupOperatorFactory groupOperatorFactory;
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private InlongGroupExtEntityMapper groupExtMapper;
@Autowired
+ private InlongStreamService streamService;
+ @Autowired
private StreamSourceEntityMapper streamSourceMapper;
@Autowired
- private SourceOperatorFactory sourceOperatorFactory;
+ private InlongClusterService clusterService;
+
@Autowired
- private InlongStreamService streamService;
+ private InlongGroupOperatorFactory groupOperatorFactory;
+ @Autowired
+ private SourceOperatorFactory sourceOperatorFactory;
/**
* Check whether modification is supported under the current group status,
and which fields can be modified.
@@ -363,16 +369,50 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
public InlongGroupTopicInfo getTopic(String groupId) {
// the group info will not null in get() method
InlongGroupInfo groupInfo = this.get(groupId);
+ InlongGroupOperator groupOperator =
groupOperatorFactory.getInstance(groupInfo.getMqType());
+ InlongGroupTopicInfo topicInfo = groupOperator.getTopic(groupInfo);
- InlongGroupOperator instance =
groupOperatorFactory.getInstance(groupInfo.getMqType());
- InlongGroupTopicInfo topicInfo = instance.getTopic(groupInfo);
+ // set the base params
+ topicInfo.setInlongGroupId(groupId);
+ String clusterTag = groupInfo.getInlongClusterTag();
+ topicInfo.setInlongClusterTag(clusterTag);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("success to get topic for groupId={}, result=" +
topicInfo, groupId);
- }
+ // assert: each MQ type has a corresponding type of cluster
+ List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(clusterTag, groupInfo.getMqType());
+ topicInfo.setClusterInfos(clusterInfos);
+
+ LOGGER.debug("success to get topic for groupId={}, result={}",
groupId, topicInfo);
return topicInfo;
}
+ @Override
+ public InlongGroupTopicInfo getBackupTopic(String groupId) {
+ // backup topic info saved in the ext table
+ InlongGroupExtEntity extEntity =
groupExtMapper.selectByUniqueKey(groupId, BACKUP_CLUSTER_TAG);
+ if (StringUtils.isBlank(extEntity.getKeyValue())) {
+ LOGGER.warn("not found any backup topic for groupId={}", groupId);
+ return null;
+ }
+
+ // the group info will not null in get() method
+ InlongGroupInfo groupInfo = this.get(groupId);
+ InlongGroupOperator groupOperator =
groupOperatorFactory.getInstance(groupInfo.getMqType());
+ InlongGroupTopicInfo backupTopicInfo =
groupOperator.getBackupTopic(groupInfo);
+
+ // set the base params
+ backupTopicInfo.setInlongGroupId(groupId);
+ String backupClusterTag = extEntity.getKeyValue();
+ backupTopicInfo.setInlongClusterTag(backupClusterTag);
+
+ // set backup cluster info
+ // assert: each MQ type has a corresponding type of cluster
+ List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(backupClusterTag, groupInfo.getMqType());
+ backupTopicInfo.setClusterInfos(clusterInfos);
+
+ LOGGER.debug("success to get backup topic for groupId={}, result={}",
groupId, backupTopicInfo);
+ return backupTopicInfo;
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
public void updateAfterApprove(InlongGroupApproveRequest approveRequest,
String operator) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 87775fb29..2afa702ca 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -427,7 +427,7 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
Preconditions.checkNotNull(groupId,
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<InlongStreamBriefInfo> topicList =
streamMapper.selectBriefList(groupId);
- LOGGER.debug("success to get topic list by groupId={}", groupId);
+ LOGGER.debug("success to get topic list by groupId={}, result
size={}", groupId, topicList.size());
return topicList;
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 8f105e10d..959d005e4 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -39,6 +39,7 @@ import
org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -104,6 +105,18 @@ public class InlongGroupController {
return Response.success(groupService.countGroupByUser(operator));
}
+ @GetMapping(value = "/group/getTopic/{groupId}")
+ @ApiOperation(value = "Get topic info")
+ public Response<InlongGroupTopicInfo> getTopic(@PathVariable String
groupId) {
+ return Response.success(groupService.getTopic(groupId));
+ }
+
+ @GetMapping(value = "/group/getBackupTopic/{groupId}")
+ @ApiOperation(value = "Get backup topic info")
+ public Response<InlongGroupTopicInfo> getBackupTopic(@PathVariable String
groupId) {
+ return Response.success(groupService.getBackupTopic(groupId));
+ }
+
@RequestMapping(value = "/group/startProcess/{groupId}", method =
RequestMethod.POST)
@ApiOperation(value = "Start inlong approval process")
@ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
@@ -162,12 +175,6 @@ public class InlongGroupController {
return
Response.success(groupProcessOperation.deleteProcessAsync(groupId, operator));
}
- @RequestMapping(value = "/group/getTopic/{groupId}", method =
RequestMethod.GET)
- @ApiOperation(value = "Get topic info")
- public Response<InlongGroupTopicInfo> getTopic(@PathVariable String
groupId) {
- return Response.success(groupService.getTopic(groupId));
- }
-
@PostMapping(value = "/group/reset")
@ApiOperation(value = "Reset group status when group is in
CONFIG_ING|SUSPENDING|RESTARTING|DELETING")
public Response<Boolean> reset(@RequestBody @Validated
InlongGroupResetRequest request) {