This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit fdf450318f418aae70d0f380fdb2a13094186108 Author: EMsnap <[email protected]> AuthorDate: Tue Jan 5 15:25:00 2021 +0800 [TUBEMQ-465] add new feature - copy offset from one group to another --- .../manager/controller/node/request/BaseReq.java | 3 ++ .../request/{BaseReq.java => CloneOffsetReq.java} | 13 +++++--- .../controller/topic/TopicWebController.java | 21 ++++++++++++ .../apache/tubemq/manager/service/NodeService.java | 32 +++++++++++++++++++ .../service/tube/TubeHttpTopicInfoList.java | 26 ++++++++++++--- .../apache/tubemq/manager/utils/ConvertUtils.java | 37 ++++++++++++++-------- 6 files changed, 110 insertions(+), 22 deletions(-) diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java index fe3a32d..0159220 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java @@ -17,6 +17,9 @@ package org.apache.tubemq.manager.controller.node.request; +import lombok.Data; + +@Data public class BaseReq { public String type; public Integer clusterId; diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java similarity index 78% copy from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java copy to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java index fe3a32d..645084d 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneOffsetReq.java @@ -17,8 +17,13 @@ package org.apache.tubemq.manager.controller.node.request; -public class BaseReq { - public String type; - public Integer clusterId; - public String method; +import lombok.Data; + +@Data +public class CloneOffsetReq extends BaseReq { + public String sourceGroupName; + public String modifyUser; + public String topicName; + public String targetGroupName; + public String confModAuthToken; } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java index dd9cd52..b8d0f05 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java @@ -25,6 +25,7 @@ import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.tubemq.manager.controller.TubeMQResult; import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq; +import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq; import org.apache.tubemq.manager.controller.node.request.CloneTopicReq; import org.apache.tubemq.manager.entry.NodeEntry; import org.apache.tubemq.manager.repository.NodeRepository; @@ -155,6 +156,7 @@ public class TopicWebController { * @return * @throws Exception */ + @PostMapping("/query/config") public @ResponseBody String queryTopicConfig( @RequestParam Map<String, String> req) throws Exception { String url = masterUtils.getQueryUrl(req); @@ -162,4 +164,23 @@ public class TopicWebController { } + /** + * + * @param req + * @return + * @throws Exception + */ + @PostMapping("/clone/offset") + public @ResponseBody TubeMQResult cloneOffset( + @RequestBody CloneOffsetReq req) throws Exception { + if (req.getClusterId() == null) + return TubeMQResult.getErrorResult("please input clusterId"); + NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue( + req.getClusterId()); + if (masterEntry == null) + return TubeMQResult.getErrorResult("no such cluster"); + return nodeService.cloneOffsetToOtherGroups(req, masterEntry); + } + + } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java index 1b15770..7de3ec3 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java @@ -45,12 +45,14 @@ import org.apache.tubemq.manager.controller.TubeMQResult; import org.apache.tubemq.manager.controller.node.request.AddBrokersReq; import org.apache.tubemq.manager.controller.node.request.AddTopicReq; import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq; +import org.apache.tubemq.manager.controller.node.request.CloneOffsetReq; import org.apache.tubemq.manager.controller.node.request.CloneTopicReq; import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq; import org.apache.tubemq.manager.entry.NodeEntry; import org.apache.tubemq.manager.repository.NodeRepository; import org.apache.tubemq.manager.service.tube.*; import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo; +import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -73,6 +75,9 @@ public class NodeService { private final TopicBackendWorker worker; + @Value("${manager.broker.webPort:8081}") + private int brokerWebPort; + @Autowired private NodeRepository nodeRepository; @@ -443,4 +448,31 @@ public class NodeService { return addTopicToBrokers(addTopicReq, master); } + + public TubeMQResult cloneOffsetToOtherGroups(CloneOffsetReq req, NodeEntry master) + throws Exception { + + // 1. query the corresponding brokers having given topic + TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, req.getTopicName()); + TubeMQResult result = new TubeMQResult(); + + if (topicInfoList != null) { + List<TopicInfo> topicInfos = topicInfoList.getTopicInfo(); + // 2. for each broker, request to clone offset + for (TopicInfo topicInfo : topicInfos) { + String brokerIp = topicInfo.getBrokerIp(); + String url = SCHEMA + brokerIp + ":" + brokerWebPort + + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req); + result = requestMaster(url); + if (result.getErrCode() != 0) { + return result; + } + } + } + + return result; + } + + + } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java index bff5a7b..ae747a9 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java @@ -50,10 +50,10 @@ public class TubeHttpTopicInfoList { @Data public static class RunInfo { - private boolean acceptPublish; - private boolean acceptSubscribe; - private int numPartitions; - private int numTopicStores; + private String acceptPublish; + private String acceptSubscribe; + private String numPartitions; + private String numTopicStores; private String brokerManageStatus; } @@ -102,6 +102,15 @@ public class TubeHttpTopicInfoList { return tmpBrokerIdList; } + + public List<TopicInfo> getTopicInfo() { + List<Integer> tmpBrokerIdList = new ArrayList<>(); + if (data != null) { + return data.get(0).getTopicInfo(); + } + return null; + } + public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> targetTopicNames, String token) { AddTopicReq req = new AddTopicReq(); @@ -118,6 +127,14 @@ public class TubeHttpTopicInfoList { String brokerStr = StringUtils.join(brokerIds, ","); String topic = StringUtils.join(targetTopicNames, ","); + setAttributes(token, req, topicInfo, brokerStr, topic); + return req; + } + + + + private void setAttributes(String token, AddTopicReq req, TopicInfo topicInfo, String brokerStr, + String topic) { req.setBrokerId(brokerStr); req.setTopicName(topic); req.setMethod(BATCH_ADD_TOPIC); @@ -130,6 +147,5 @@ public class TubeHttpTopicInfoList { req.setUnflushInterval(topicInfo.getUnflushInterval()); req.setConfModAuthToken(token); req.setDeletePolicy(topicInfo.getDeletePolicy()); - return req; } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java index f93819f..91e4f8f 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java @@ -34,21 +34,32 @@ public class ConvertUtils { public static String convertReqToQueryStr(Object req) throws Exception { List<String> queryList = new ArrayList<>(); Class<?> clz = req.getClass(); - Field[] fields = clz.getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - Object o = field.get(req); - String value; - // convert list to json string - if (o == null) continue; - if (o instanceof List) { - value = gson.toJson(o); - } else { - value = o.toString(); - } - queryList.add(field.getName() + "=" + URLEncoder.encode( + List fieldsList = new ArrayList<Field[]>(); + + while (clz != null) { + Field[] declaredFields = clz.getDeclaredFields(); + fieldsList.add(declaredFields); + clz = clz.getSuperclass(); + } + + for (Object fields:fieldsList) { + Field[] f = (Field[]) fields; + for (Field field : f) { + field.setAccessible(true); + Object o = field.get(req); + String value; + // convert list to json string + if (o == null) continue; + if (o instanceof List) { + value = gson.toJson(o); + } else { + value = o.toString(); + } + queryList.add(field.getName() + "=" + URLEncoder.encode( value, UTF_8.toString())); + } } + return StringUtils.join(queryList, "&"); } }
