This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f670f098d5dda1600c2fabca5b33f2b7aceffd99 Author: ganfengtan <[email protected]> AuthorDate: Sun Jul 3 14:18:28 2022 +0800 [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844) --- .../common/pojo/cluster/tube/TubeClusterDTO.java | 61 ++++++++++++++++++++++ .../common/pojo/cluster/tube/TubeClusterInfo.java | 4 +- .../pojo/cluster/tube/TubeClusterRequest.java | 4 ++ .../service/cluster/TubeClusterOperator.java | 28 +++++++++- .../manager/service/mq/util/TubeMQOperator.java | 4 +- 5 files changed, 96 insertions(+), 5 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java new file mode 100644 index 000000000..b3bde1042 --- /dev/null +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.inlong.manager.common.pojo.cluster.tube; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; + +import javax.validation.constraints.NotNull; + +/** + * Tube cluster info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Tube cluster info") +public class TubeClusterDTO { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe + + @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080") + private String masterWebUrl; + + /** + * Get the dto instance from the JSON string. + */ + public static TubeClusterDTO getFromJson(@NotNull String extParams) { + try { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java index 968a0de54..f32e89040 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.common.pojo.cluster.tube; import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -36,7 +37,8 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine; @ApiModel("Inlong cluster info for Tube") public class TubeClusterInfo extends ClusterInfo { - // no fields + @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080") + private String masterWebUrl; public TubeClusterInfo() { this.setType(ClusterType.TUBE); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java index 613905519..f8b3002d1 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.common.pojo.cluster.tube; import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -35,6 +36,9 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine; @ApiModel("Inlong cluster request for Tube") public class TubeClusterRequest extends ClusterRequest { + @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080") + private String masterWebUrl; + // no field public TubeClusterRequest() { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java index 2a7e01c48..cb6eed3bc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java @@ -17,17 +17,22 @@ package org.apache.inlong.manager.service.cluster; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo; import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest; +import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterDTO; import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo; +import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterRequest; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.dao.entity.InlongClusterEntity; import org.apache.inlong.manager.service.group.InlongNoneMqOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** @@ -38,6 +43,9 @@ public class TubeClusterOperator extends AbstractClusterOperator { private static final Logger LOGGER = LoggerFactory.getLogger(InlongNoneMqOperator.class); + @Autowired + private ObjectMapper objectMapper; + @Override public Boolean accept(String clusterType) { return getClusterType().equals(clusterType); @@ -50,7 +58,15 @@ public class TubeClusterOperator extends AbstractClusterOperator { @Override protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) { - LOGGER.info("do nothing for tube cluster in set target entity"); + TubeClusterRequest tubeRequest = (TubeClusterRequest) request; + CommonBeanUtils.copyProperties(tubeRequest, targetEntity, true); + try { + TubeClusterDTO dto = objectMapper.convertValue(tubeRequest, TubeClusterDTO.class); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.info("success to set entity for tube cluster"); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage()); + } } @Override @@ -58,7 +74,15 @@ public class TubeClusterOperator extends AbstractClusterOperator { if (entity == null) { throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND); } - return CommonBeanUtils.copyProperties(entity, TubeClusterInfo::new); + TubeClusterInfo tubeClusterInfo = new TubeClusterInfo(); + CommonBeanUtils.copyProperties(entity, tubeClusterInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + TubeClusterDTO dto = TubeClusterDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, tubeClusterInfo); + } + + LOGGER.info("success to get tube cluster info from entity"); + return tubeClusterInfo; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java index 0f68ba995..1fa9e489a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java @@ -69,7 +69,7 @@ public class TubeMQOperator { * Create topic for the given tube cluster. */ public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) { - String masterUrl = tubeCluster.getUrl(); + String masterUrl = tubeCluster.getMasterWebUrl(); LOGGER.info("begin to create tube topic {} in master {}", topicName, masterUrl); if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) { throw new BusinessException("tube master url or tube topic cannot be null"); @@ -88,7 +88,7 @@ public class TubeMQOperator { * Create consumer group for the given tube topic and cluster. */ public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) { - String masterUrl = tubeCluster.getUrl(); + String masterUrl = tubeCluster.getMasterWebUrl(); LOGGER.info("begin to create consumer group {} for topic {} in master {}", consumerGroup, topic, masterUrl); if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) { throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");
