This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new a380eb9  [TUBEMQ-481] add new feature - clone broker (#374)
a380eb9 is described below

commit a380eb924c0c31306d5827f44dc0c13de45a809a
Author: EMsnap <[email protected]>
AuthorDate: Tue Dec 29 00:39:13 2020 -0800

    [TUBEMQ-481] add new feature - clone broker (#374)
    
    * [TUBEMQ-481] add new feature - clone broker
    
    * [TUBEMQ-481] add comment
---
 tubemq-manager/pom.xml                             |   6 +
 .../tubemq/manager/controller/TubeMQResult.java    |   3 +-
 .../manager/controller/node/NodeController.java    |  40 ++++++-
 .../controller/node/request/AddBrokersReq.java     |  14 +++
 .../{AddBrokersReq.java => AddTopicReq.java}       |  32 ++---
 .../request/{AddBrokersReq.java => BaseReq.java}   |  30 +----
 .../{AddBrokersReq.java => CloneBrokersReq.java}   |  28 ++---
 .../{AddBrokersReq.java => QueryBrokerCfgReq.java} |  35 +++---
 .../apache/tubemq/manager/service/NodeService.java | 133 +++++++++++++++++++--
 .../tubemq/manager/service/TubeMQHttpConst.java    |   5 +
 .../tube/AddBrokerResult.java}                     |  30 ++---
 .../tube/AddTopicsResult.java}                     |  27 +----
 .../node/request => service/tube}/BrokerConf.java  |  27 ++++-
 .../tube/BrokerStatusInfo.java}                    |  30 ++---
 .../tube/IpIdRelation.java}                        |  29 +----
 .../tube/TubeHttpBrokerCfgInfo.java}               |  42 +++----
 .../service/tube/TubeHttpClusterInfoList.java      |   4 +-
 .../apache/tubemq/manager/utils/ConvertUtils.java  |   1 +
 .../web/handler/WebBrokerDefConfHandler.java       |  20 +++-
 19 files changed, 318 insertions(+), 218 deletions(-)

diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index 18e692c..eaf220a 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -49,6 +49,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.3</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index 6e4a970..d8e033e 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -30,9 +30,10 @@ public class TubeMQResult {
     private String errMsg = "";
     private int errCode = 0;
     private boolean result = true;
+    private String data;
 
     public static TubeMQResult getErrorResult(String errorMsg) {
         return TubeMQResult.builder().errCode(-1)
-                .errMsg(errorMsg).result(false).build();
+                .errMsg(errorMsg).result(false).data("").build();
     }
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index ab5567d..2c81825 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -17,25 +17,35 @@
 
 package org.apache.tubemq.manager.controller.node;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.tube.*;
 import org.apache.tubemq.manager.utils.MasterUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static 
org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
 import static 
org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
 import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
@@ -59,6 +69,13 @@ public class NodeController {
     @Autowired
     MasterUtils masterUtil;
 
+    /**
+     * query brokers in certain cluster
+     * @param type
+     * @param method
+     * @param clusterId
+     * @return
+     */
     @RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(@RequestParam String type, 
@RequestParam String method,
@@ -95,13 +112,29 @@ public class NodeController {
         return queryMaster(url);
     }
 
+    /**
+     * clone source broker to generate brokers with the same config and copy 
the topics in it.
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @RequestMapping(value = "/clone", method = RequestMethod.POST)
+    public @ResponseBody String cloneBrokers(
+            @RequestBody CloneBrokersReq req) throws Exception {
+        int clusterId = req.getClusterId();
+        TubeMQResult tubeResult = nodeService.cloneBrokersWithTopic(req, 
clusterId);
+        return gson.toJson(tubeResult);
+    }
+
+
+
 
     /**
      * add brokers to cluster, need to check token and
      * make sure user has authorization to modify it.
      */
     @RequestMapping(value = "/add", method = RequestMethod.POST)
-    public @ResponseBody String addBrokersToCluster(
+    public @ResponseBody String addBrokers(
             @RequestBody AddBrokersReq req) throws Exception {
         String token = req.getConfModAuthToken();
         int clusterId = req.getClusterId();
@@ -175,8 +208,6 @@ public class NodeController {
         return gson.toJson(result);
     }
 
-
-
     private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry 
masterEntry) throws Exception {
         String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
                 + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
@@ -184,5 +215,4 @@ public class NodeController {
         return tubeMQResult;
     }
 
-
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
index 13d10c4..66281e6 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -18,10 +18,14 @@
 package org.apache.tubemq.manager.controller.node.request;
 
 
+import lombok.Builder;
 import lombok.Data;
+import org.apache.tubemq.manager.service.tube.BrokerConf;
 
 import java.util.List;
 
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
+
 @Data
 public class AddBrokersReq {
 
@@ -43,4 +47,14 @@ public class AddBrokersReq {
 
     public List<BrokerConf> brokerJsonSet;
 
+    public static AddBrokersReq getAddBrokerReq(String token, int clusterId) {
+        AddBrokersReq req = new AddBrokersReq();
+        req.setClusterId(clusterId);
+        req.setMethod(BATCH_ADD_BROKER);
+        req.setType(OP_MODIFY);
+        req.setCreateUser(WEB_API);
+        req.setConfModAuthToken(token);
+        return req;
+    }
+
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
similarity index 74%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 13d10c4..3eb73ab 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,27 +20,19 @@ package org.apache.tubemq.manager.controller.node.request;
 
 import lombok.Data;
 
-import java.util.List;
-
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
+public class AddTopicReq extends BaseReq {
     public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
+    public String deleteWhen;
+    public Integer unflushThreshold;
+    public Boolean acceptPublish;
+    public Integer numPartitions;
+    public String deletePolicy;
+    public Integer unflushInterval;
+    public Boolean acceptSubscribe;
     public String method;
-
-    /**
-     * op_modify
-     */
     public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
+    public String brokerId;
+    public String confModAuthToken;
+    public String topicName;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
similarity index 74%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
index 13d10c4..fe3a32d 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,30 +17,8 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
+public class BaseReq {
     public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
+    public Integer clusterId;
+    public String method;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
similarity index 81%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
index 13d10c4..f59c5d5 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,30 +17,18 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
 import lombok.Data;
+import org.apache.tubemq.manager.entry.TopicEntry;
 
 import java.util.List;
 
-@Data
-public class AddBrokersReq {
 
+@Data
+public class CloneBrokersReq {
+    Integer sourceBrokerId;
+    List<String> targetIps;
+    List<AddTopicReq> addTopicReqs;
     public String confModAuthToken;
-
     public String createUser;
-
     public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
-    public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
-}
+}
\ No newline at end of file
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
similarity index 61%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
index 13d10c4..d60c375 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,30 +17,31 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
 import lombok.Data;
 
-import java.util.List;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static 
org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_BROKER_CONFIG;
 
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
+public class QueryBrokerCfgReq {
 
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
     public String method;
 
-    /**
-     * op_modify
-     */
+    public Integer brokerId;
+
     public String type;
 
-    public List<BrokerConf> brokerJsonSet;
+    public boolean withDetail;
+
+    public boolean withTopic;
 
+    public static QueryBrokerCfgReq getReq(Integer brokerId) {
+        QueryBrokerCfgReq req = new QueryBrokerCfgReq();
+        req.setBrokerId(brokerId);
+        req.setMethod(QUERY_BROKER_CONFIG);
+        req.setType(OP_QUERY);
+        req.setWithTopic(false);
+        req.setWithDetail(true);
+        return req;
+    }
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 836fe8a..ac60aee 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -18,34 +18,37 @@
 package org.apache.tubemq.manager.service;
 
 
+import static 
org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
 import static 
org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
 import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
 import static 
org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static 
org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.apache.tubemq.manager.service.tube.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -120,6 +123,106 @@ public class NodeService {
     }
 
 
+    public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req, int 
clusterId) throws Exception {
+
+        // 1. query source broker config
+        QueryBrokerCfgReq queryReq = 
QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
+        NodeEntry masterEntry = 
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+                clusterId);
+        BrokerStatusInfo brokerStatusInfo = getBrokerStatusInfo(queryReq, 
masterEntry);
+
+        // 2. use source broker config to clone brokers
+        BrokerConf sourceBrokerConf = brokerStatusInfo.getData().get(0);
+        AddBrokersReq addBrokersReq = getBatchAddBrokersReq(req, clusterId, 
sourceBrokerConf);
+
+        // 3. request master, return broker ids generated by master
+        AddBrokerResult addBrokerResult = 
addBrokersToClusterWithId(addBrokersReq, masterEntry);
+
+        // might have duplicate brokers
+        if (addBrokerResult.getErrCode() != 0) {
+            return TubeMQResult.getErrorResult(addBrokerResult.getErrMsg());
+        }
+        List<Integer> brokerIds = getBrokerIds(addBrokerResult);
+        List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
+
+        // 4. add topic to brokers
+        return addTopicToBrokers(masterEntry, brokerIds, addTopicReqs);
+    }
+
+    private TubeMQResult addTopicToBrokers(NodeEntry masterEntry, 
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+        TubeMQResult tubeResult = new TubeMQResult();
+        AddTopicsResult addTopicsResult = new AddTopicsResult();
+
+        if (CollectionUtils.isEmpty(addTopicReqs)) {
+            return tubeResult;
+        }
+        addTopicReqs.forEach(addTopicReq -> {
+            try {
+                String brokerStr = StringUtils.join(brokerIds, ",");
+                addTopicReq.setBrokerId(brokerStr);
+                TubeMQResult result = addTopicToBrokers(addTopicReq, 
masterEntry);
+                if (result.getErrCode() == 0) {
+                    
addTopicsResult.getSuccessTopics().add(addTopicReq.getTopicName());
+                } else {
+                    
addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
+                }
+            } catch (Exception e) {
+                log.error("add topic to brokers fail with exception", e);
+                
addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
+            }
+        });
+
+        tubeResult.setData(gson.toJson(addTopicsResult));
+        return tubeResult;
+    }
+
+    private List<Integer> getBrokerIds(AddBrokerResult addBrokerResult) {
+        List<IpIdRelation> ipids = addBrokerResult.getData();
+        List<Integer> brokerIds = Lists.newArrayList();
+        for (IpIdRelation ipid : ipids) {
+            brokerIds.add(ipid.getId());
+        }
+        return brokerIds;
+    }
+
+    private AddBrokersReq getBatchAddBrokersReq(CloneBrokersReq req, int 
clusterId, BrokerConf sourceBrokerConf) {
+        AddBrokersReq addBrokersReq = 
getAddBrokerReq(req.getConfModAuthToken(), clusterId);
+
+        // generate add brokers req using given target broker ips
+        List<BrokerConf> brokerConfs = Lists.newArrayList();
+        req.getTargetIps().forEach(ip -> {
+            BrokerConf brokerConf = new BrokerConf(sourceBrokerConf);
+            brokerConf.setBrokerIp(ip);
+            brokerConf.setBrokerId(0);
+            brokerConfs.add(brokerConf);
+        });
+        addBrokersReq.setBrokerJsonSet(brokerConfs);
+        return addBrokersReq;
+    }
+
+    private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq, 
NodeEntry masterEntry) throws Exception {
+        String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + 
convertReqToQueryStr(queryReq);
+        BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
+                BrokerStatusInfo.class);
+        return brokerStatusInfo;
+    }
+
+    public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry 
masterEntry) throws Exception {
+        String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        return requestMaster(url);
+    }
+
+
+    public TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry 
masterEntry) throws Exception {
+        String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        TubeMQResult tubeMQResult = requestMaster(url);
+        return tubeMQResult;
+    }
+
+
     private boolean configBrokersForTopics(NodeEntry nodeEntry,
             Set<String> topics, List<Integer> brokerList, int maxBrokers) {
         List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
@@ -296,4 +399,18 @@ public class NodeService {
     public void close() throws IOException {
         httpclient.close();
     }
+
+    public AddBrokerResult addBrokersToClusterWithId(AddBrokersReq req, 
NodeEntry masterEntry) throws Exception {
+
+        String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+        HttpGet httpget = new HttpGet(url);
+        try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+            return gson.fromJson(new 
InputStreamReader(response.getEntity().getContent()),
+                    AddBrokerResult.class);
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+        }
+        return null;
+    }
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index d1d4949..71be4ec 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -27,4 +27,9 @@ public class TubeMQHttpConst {
             "/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
     public static final String RELOAD_BROKER =
             "/webapi.htm?type=op_modify&method=admin_reload_broker_configure";
+    public static final String QUERY_BROKER_CONFIG = 
"admin_query_broker_configure";
+    public static final String OP_QUERY = "op_query";
+    public static final String OP_MODIFY = "op_modify";
+    public static final String BATCH_ADD_BROKER = 
"admin_bath_add_broker_configure";
+    public static final String WEB_API = "webapi";
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
similarity index 69%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
index 13d10c4..f43074a 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,32 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
 
 import lombok.Data;
 
 import java.util.List;
 
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
-    public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
+public class AddBrokerResult {
+    private int code;
+    private String errMsg;
+    private int errCode;
+    private List<IpIdRelation> data;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
similarity index 69%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
index 13d10c4..6cd3b01 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,32 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
 
+import com.google.common.collect.Lists;
 import lombok.Data;
 
 import java.util.List;
 
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
-    public String type;
+public class AddTopicsResult {
 
-    public List<BrokerConf> brokerJsonSet;
+    public List<String> failTopics = Lists.newArrayList();
 
+    public List<String> successTopics = Lists.newArrayList();
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
similarity index 58%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
index fc48d47..5dc908c 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
@@ -15,12 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
-
-
+package org.apache.tubemq.manager.service.tube;
 import lombok.Data;
 
-import java.security.SecureRandom;
 
 @Data
 public class BrokerConf {
@@ -41,6 +38,28 @@ public class BrokerConf {
     public Integer memCacheMsgCntInK;
     public Integer memCacheMsgSizeInMB;
     public Integer memCacheFlushIntegervl;
+    public String deletePolicy;
+
+    public BrokerConf(BrokerConf other) {
+        this.brokerIp = other.brokerIp;
+        this.brokerPort = other.brokerPort;
+        this.brokerId = other.brokerId;
+        this.deleteWhen = other.deleteWhen;
+        this.numPartitions = other.numPartitions;
+        this.unflushThreshold = other.unflushThreshold;
+        this.unflushIntegererval = other.unflushIntegererval;
+        this.unflushDataHold = other.unflushDataHold;
+        this.acceptPublish = other.acceptPublish;
+        this.acceptSubscribe = other.acceptSubscribe;
+        this.createUser = other.createUser;
+        this.brokerTLSPort = other.brokerTLSPort;
+        this.numTopicStores = other.numTopicStores;
+        this.memCacheMsgCntInK = other.memCacheMsgCntInK;
+        this.memCacheMsgSizeInMB = other.memCacheMsgSizeInMB;
+        this.memCacheFlushIntegervl = other.memCacheFlushIntegervl;
+        this.deletePolicy = other.deletePolicy;
+    }
+
 
 }
 
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
similarity index 69%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
index 13d10c4..11e806e 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,32 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
 
 import lombok.Data;
 
 import java.util.List;
 
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
-    public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
+public class BrokerStatusInfo {
+    public int code;
+    public String errMsg;
+    // total broker configuration info list of brokers.
+    public List<BrokerConf> data;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
similarity index 67%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
index 13d10c4..963b79e 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,32 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
 
+package org.apache.tubemq.manager.service.tube;
 
 import lombok.Data;
 
-import java.util.List;
-
 @Data
-public class AddBrokersReq {
-
-    public String confModAuthToken;
-
-    public String createUser;
-
-    public int clusterId;
-
-    /**
-     * admin_bath_add_broker_configure
-     */
-    public String method;
-
-    /**
-     * op_modify
-     */
-    public String type;
-
-    public List<BrokerConf> brokerJsonSet;
-
+public class IpIdRelation {
+    public String ip;
+    public Integer id;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
similarity index 80%
rename from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
rename to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
index fc48d47..5debb19 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,33 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.tubemq.manager.controller.node.request;
 
+package org.apache.tubemq.manager.service.tube;
 
-import lombok.Data;
-
-import java.security.SecureRandom;
-
-@Data
-public class BrokerConf {
-
-    public String brokerIp;
-    public Integer brokerPort;
-    public Integer brokerId;
-    public String deleteWhen;
-    public Integer numPartitions;
-    public Integer unflushThreshold;
-    public Integer unflushIntegererval;
-    public Integer unflushDataHold;
+public class TubeHttpBrokerCfgInfo {
     public boolean acceptPublish;
     public boolean acceptSubscribe;
-    public String createUser;
+    public Integer brokerId;
+    public String brokerIp;
+    public Integer brokerPort;
     public Integer brokerTLSPort;
-    public Integer numTopicStores;
+    public String createDate;
+    public String createUser;
+    public String deletePolicy;
+    public String deleteWhen;
+    public String modifyDate;
+    public String modifyUser;
+    public Integer memCacheFlushIntvl;
     public Integer memCacheMsgCntInK;
     public Integer memCacheMsgSizeInMB;
-    public Integer memCacheFlushIntegervl;
-
+    public Integer numPartitions;
+    public Integer numTopicStores;
+    public Integer unflushDataHold;
+    public Integer unflushInterval;
+    public Integer unflushThreshold;
+    public boolean hasTLSPort;
 }
-
-
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
index 9a36de9..4da3f04 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
@@ -28,7 +28,7 @@ import java.util.Map;
 @Data
 public class TubeHttpClusterInfoList extends TubeMQResult {
 
-    private List<ClusterData> data = new ArrayList<>();
+    private List<ClusterData> clusterData = new ArrayList<>();
 
     @Data
     @AllArgsConstructor
@@ -63,7 +63,7 @@ public class TubeHttpClusterInfoList extends TubeMQResult {
                     ClusterData.ClusterInfo singleClusterInfo = 
getSingleClusterInfo(entries);
                     ClusterData clusterData =
                             new ClusterData(id, 
entries.get(0).getClusterName(), singleClusterInfo);
-                    clusterInfoList.getData().add(clusterData);
+                    clusterInfoList.getClusterData().add(clusterData);
                 }
         );
         return clusterInfoList;
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index 126b347..f93819f 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -40,6 +40,7 @@ public class ConvertUtils {
             Object o = field.get(req);
             String value;
             // convert list to json string
+            if (o == null) continue;
             if (o instanceof List) {
                 value = gson.toJson(o);
             } else {
diff --git 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index befb362..d967af3 100644
--- 
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++ 
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -18,6 +18,8 @@
 package org.apache.tubemq.server.master.web.handler;
 
 import static java.lang.Math.abs;
+
+import com.google.common.collect.Maps;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
@@ -353,6 +355,9 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
             HashMap<String, BdbBrokerConfEntity> inBrokerConfEntityMap = new 
HashMap<>();
             ConcurrentHashMap<Integer, BdbBrokerConfEntity> 
bdbBrokerConfEntityMap =
                     brokerConfManager.getBrokerConfStoreMap();
+            // save broker ip and id relation.
+            Map<String, Integer> brokerIpId = Maps.newHashMap();
+
             for (int count = 0; count < brokerJsonArray.size(); count++) {
                 Map<String, String> jsonObject = brokerJsonArray.get(count);
                 try {
@@ -415,6 +420,7 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
                     final String inputKey = 
strBuffer.append(brokerId).append("-")
                             
.append(brokerIp).append("-").append(brokerPort).toString();
                     strBuffer.delete(0, strBuffer.length());
+                    brokerIpId.put(brokerIp, brokerId);
                     final String deleteWhen =
                             
WebParameterUtils.validDecodeStringParameter("deleteWhen", 
jsonObject.get("deleteWhen"),
                                     
TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, "0 0 6,18 * * ?");
@@ -494,7 +500,19 @@ public class WebBrokerDefConfHandler extends 
AbstractWebHandler {
             for (BdbBrokerConfEntity brokerConfEntity : 
inBrokerConfEntityMap.values()) {
                 brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
             }
-            
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+            
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"");
+
+            if (brokerIpId.size() > 0) {
+                // add ip and id relation
+                strBuffer.append(",\"data\":[");
+                brokerIpId.forEach((ip, id) -> {
+                    strBuffer.append("{\"ip\":").append(ip);
+                    strBuffer.append(",\"id\":").append(id).append("},");
+                });
+                strBuffer.deleteCharAt(strBuffer.length() - 1).append("]");
+            }
+            strBuffer.append("}");
+
         } catch (Exception e) {
             strBuffer.delete(0, strBuffer.length());
             strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")

Reply via email to