This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2db5c9b25 [INLONG-7067][Manager] Provide MQ cluster info in
consumption details (#7068)
2db5c9b25 is described below
commit 2db5c9b2521fad9c9cf726b3c34380c79950873c
Author: fuweng11 <[email protected]>
AuthorDate: Tue Dec 27 12:13:15 2022 +0800
[INLONG-7067][Manager] Provide MQ cluster info in consumption details
(#7068)
---
.../manager/common/consts/InlongConstants.java | 2 ++
.../manager/pojo/consume/InlongConsumeInfo.java | 5 ++++
.../service/consume/AbstractConsumeOperator.java | 5 ++++
.../service/consume/ConsumePulsarOperator.java | 33 ++++++++++++++++++----
4 files changed, 39 insertions(+), 6 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c41299faa..8a49627bb 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -35,6 +35,8 @@ public class InlongConstants {
*/
public static final String COMMA = ",";
+ public static final String SLASH = "/";
+
public static final String COLON = ":";
public static final String SEMICOLON = ";";
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
index 48a48bed3..a8c2d3816 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
@@ -25,8 +25,10 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import java.util.Date;
+import java.util.List;
/**
* Base inlong consume info
@@ -87,6 +89,9 @@ public abstract class InlongConsumeInfo extends
BaseInlongConsume {
@ApiModelProperty(value = "Version number")
private Integer version;
+ @ApiModelProperty(value = "MQ cluster info list")
+ private List<? extends ClusterInfo> clusterInfos;
+
public abstract InlongConsumeRequest genRequest();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
index 8840701bc..52a0439de 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.consume;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -69,6 +70,10 @@ public abstract class AbstractConsumeOperator implements
InlongConsumeOperator {
@Override
@Transactional(rollbackFor = Throwable.class, isolation =
Isolation.REPEATABLE_READ)
public void updateOpt(InlongConsumeRequest request, String operator) {
+ // firstly check the topic info
+ if (StringUtils.isNotBlank(request.getTopic())) {
+ this.checkTopicInfo(request);
+ }
// get the entity from request
InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request,
InlongConsumeEntity::new);
// set the ext params
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 16bb1c84e..6edc3a243 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -19,19 +19,23 @@ package org.apache.inlong.manager.service.consume;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
@@ -40,6 +44,8 @@ import
org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+
/**
* Inlong consume operator for Pulsar.
*/
@@ -85,12 +91,12 @@ public class ConsumePulsarOperator extends
AbstractConsumeOperator {
// check the origin topic from request exists
InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo) topicInfo;
String originTopic = request.getTopic();
+ if (originTopic.startsWith("persistent")) {
+ originTopic =
originTopic.substring(originTopic.lastIndexOf(InlongConstants.SLASH) + 1);
+ request.setTopic(originTopic);
+ }
Preconditions.checkTrue(pulsarTopic.getTopics().contains(originTopic),
"Pulsar topic not exist for " + originTopic);
-
- // format the topic to 'tenant/namespace/topic'
- request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
- pulsarTopic.getTenant(), pulsarTopic.getNamespace(),
originTopic));
}
@Override
@@ -103,7 +109,14 @@ public class ConsumePulsarOperator extends
AbstractConsumeOperator {
ConsumePulsarDTO dto =
ConsumePulsarDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(dto, consumeInfo);
}
-
+ String groupId = entity.getInlongGroupId();
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ String clusterTag = groupInfo.getInlongClusterTag();
+ List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist
for groupId=" + groupId);
+ consumeInfo.setClusterInfos(clusterInfos);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo)
clusterInfos.get(0);
+ consumeInfo.setTopic(getFullPulsarTopic(groupInfo,
pulsarCluster.getTenant(), entity.getTopic()));
return consumeInfo;
}
@@ -147,4 +160,12 @@ public class ConsumePulsarOperator extends
AbstractConsumeOperator {
}
}
+ private String getFullPulsarTopic(InlongGroupInfo groupInfo, String
tenant, String topic) {
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ }
+ String namespace = groupInfo.getMqResource();
+ return String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
namespace, topic);
+ }
+
}