This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new a380eb9 [TUBEMQ-481] add new feature - clone broker (#374)
a380eb9 is described below
commit a380eb924c0c31306d5827f44dc0c13de45a809a
Author: EMsnap <[email protected]>
AuthorDate: Tue Dec 29 00:39:13 2020 -0800
[TUBEMQ-481] add new feature - clone broker (#374)
* [TUBEMQ-481] add new feature - clone broker
* [TUBEMQ-481] add comment
---
tubemq-manager/pom.xml | 6 +
.../tubemq/manager/controller/TubeMQResult.java | 3 +-
.../manager/controller/node/NodeController.java | 40 ++++++-
.../controller/node/request/AddBrokersReq.java | 14 +++
.../{AddBrokersReq.java => AddTopicReq.java} | 32 ++---
.../request/{AddBrokersReq.java => BaseReq.java} | 30 +----
.../{AddBrokersReq.java => CloneBrokersReq.java} | 28 ++---
.../{AddBrokersReq.java => QueryBrokerCfgReq.java} | 35 +++---
.../apache/tubemq/manager/service/NodeService.java | 133 +++++++++++++++++++--
.../tubemq/manager/service/TubeMQHttpConst.java | 5 +
.../tube/AddBrokerResult.java} | 30 ++---
.../tube/AddTopicsResult.java} | 27 +----
.../node/request => service/tube}/BrokerConf.java | 27 ++++-
.../tube/BrokerStatusInfo.java} | 30 ++---
.../tube/IpIdRelation.java} | 29 +----
.../tube/TubeHttpBrokerCfgInfo.java} | 42 +++----
.../service/tube/TubeHttpClusterInfoList.java | 4 +-
.../apache/tubemq/manager/utils/ConvertUtils.java | 1 +
.../web/handler/WebBrokerDefConfHandler.java | 20 +++-
19 files changed, 318 insertions(+), 218 deletions(-)
diff --git a/tubemq-manager/pom.xml b/tubemq-manager/pom.xml
index 18e692c..eaf220a 100644
--- a/tubemq-manager/pom.xml
+++ b/tubemq-manager/pom.xml
@@ -49,6 +49,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.3</version>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
index 6e4a970..d8e033e 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java
@@ -30,9 +30,10 @@ public class TubeMQResult {
private String errMsg = "";
private int errCode = 0;
private boolean result = true;
+ private String data;
public static TubeMQResult getErrorResult(String errorMsg) {
return TubeMQResult.builder().errCode(-1)
- .errMsg(errorMsg).result(false).build();
+ .errMsg(errorMsg).result(false).data("").build();
}
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index ab5567d..2c81825 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -17,25 +17,35 @@
package org.apache.tubemq.manager.controller.node;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.tube.*;
import org.apache.tubemq.manager.utils.MasterUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult;
-import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static
org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
import static
org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
import static org.apache.tubemq.manager.utils.MasterUtils.*;
@@ -59,6 +69,13 @@ public class NodeController {
@Autowired
MasterUtils masterUtil;
+ /**
+ * query brokers in certain cluster
+ * @param type
+ * @param method
+ * @param clusterId
+ * @return
+ */
@RequestMapping(value = "/query/clusterInfo", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_VALUE)
public @ResponseBody String queryInfo(@RequestParam String type,
@RequestParam String method,
@@ -95,13 +112,29 @@ public class NodeController {
return queryMaster(url);
}
+ /**
+ * clone source broker to generate brokers with the same config and copy
the topics in it.
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ @RequestMapping(value = "/clone", method = RequestMethod.POST)
+ public @ResponseBody String cloneBrokers(
+ @RequestBody CloneBrokersReq req) throws Exception {
+ int clusterId = req.getClusterId();
+ TubeMQResult tubeResult = nodeService.cloneBrokersWithTopic(req,
clusterId);
+ return gson.toJson(tubeResult);
+ }
+
+
+
/**
* add brokers to cluster, need to check token and
* make sure user has authorization to modify it.
*/
@RequestMapping(value = "/add", method = RequestMethod.POST)
- public @ResponseBody String addBrokersToCluster(
+ public @ResponseBody String addBrokers(
@RequestBody AddBrokersReq req) throws Exception {
String token = req.getConfModAuthToken();
int clusterId = req.getClusterId();
@@ -175,8 +208,6 @@ public class NodeController {
return gson.toJson(result);
}
-
-
private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry
masterEntry) throws Exception {
String url = SCHEMA + masterEntry.getIp() + ":" +
masterEntry.getWebPort()
+ "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
@@ -184,5 +215,4 @@ public class NodeController {
return tubeMQResult;
}
-
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
index 13d10c4..66281e6 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -18,10 +18,14 @@
package org.apache.tubemq.manager.controller.node.request;
+import lombok.Builder;
import lombok.Data;
+import org.apache.tubemq.manager.service.tube.BrokerConf;
import java.util.List;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.*;
+
@Data
public class AddBrokersReq {
@@ -43,4 +47,14 @@ public class AddBrokersReq {
public List<BrokerConf> brokerJsonSet;
+ public static AddBrokersReq getAddBrokerReq(String token, int clusterId) {
+ AddBrokersReq req = new AddBrokersReq();
+ req.setClusterId(clusterId);
+ req.setMethod(BATCH_ADD_BROKER);
+ req.setType(OP_MODIFY);
+ req.setCreateUser(WEB_API);
+ req.setConfModAuthToken(token);
+ return req;
+ }
+
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
similarity index 74%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 13d10c4..3eb73ab 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,27 +20,19 @@ package org.apache.tubemq.manager.controller.node.request;
import lombok.Data;
-import java.util.List;
-
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
+public class AddTopicReq extends BaseReq {
public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
+ public String deleteWhen;
+ public Integer unflushThreshold;
+ public Boolean acceptPublish;
+ public Integer numPartitions;
+ public String deletePolicy;
+ public Integer unflushInterval;
+ public Boolean acceptSubscribe;
public String method;
-
- /**
- * op_modify
- */
public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
+ public String brokerId;
+ public String confModAuthToken;
+ public String topicName;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
similarity index 74%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
index 13d10c4..fe3a32d 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BaseReq.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,30 +17,8 @@
package org.apache.tubemq.manager.controller.node.request;
-
-import lombok.Data;
-
-import java.util.List;
-
-@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
+public class BaseReq {
public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
+ public Integer clusterId;
+ public String method;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
similarity index 81%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
index 13d10c4..f59c5d5 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneBrokersReq.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,30 +17,18 @@
package org.apache.tubemq.manager.controller.node.request;
-
import lombok.Data;
+import org.apache.tubemq.manager.entry.TopicEntry;
import java.util.List;
-@Data
-public class AddBrokersReq {
+@Data
+public class CloneBrokersReq {
+ Integer sourceBrokerId;
+ List<String> targetIps;
+ List<AddTopicReq> addTopicReqs;
public String confModAuthToken;
-
public String createUser;
-
public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
- public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
-}
+}
\ No newline at end of file
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
similarity index 61%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
index 13d10c4..d60c375 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/QueryBrokerCfgReq.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -17,30 +17,31 @@
package org.apache.tubemq.manager.controller.node.request;
-
import lombok.Data;
-import java.util.List;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_QUERY;
+import static
org.apache.tubemq.manager.service.TubeMQHttpConst.QUERY_BROKER_CONFIG;
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
+public class QueryBrokerCfgReq {
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
public String method;
- /**
- * op_modify
- */
+ public Integer brokerId;
+
public String type;
- public List<BrokerConf> brokerJsonSet;
+ public boolean withDetail;
+
+ public boolean withTopic;
+ public static QueryBrokerCfgReq getReq(Integer brokerId) {
+ QueryBrokerCfgReq req = new QueryBrokerCfgReq();
+ req.setBrokerId(brokerId);
+ req.setMethod(QUERY_BROKER_CONFIG);
+ req.setType(OP_QUERY);
+ req.setWithTopic(false);
+ req.setWithDetail(true);
+ return req;
+ }
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 836fe8a..ac60aee 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -18,34 +18,37 @@
package org.apache.tubemq.manager.service;
+import static
org.apache.tubemq.manager.controller.node.request.AddBrokersReq.getAddBrokerReq;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.ADD_TUBE_TOPIC;
import static
org.apache.tubemq.manager.service.TubeMQHttpConst.BROKER_RUN_STATUS;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.RELOAD_BROKER;
import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
import static
org.apache.tubemq.manager.service.TubeMQHttpConst.TOPIC_CONFIG_INFO;
+import static
org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+import static org.apache.tubemq.manager.utils.MasterUtils.*;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
-import org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpClusterInfoList;
-import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
-import org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList;
+import org.apache.tubemq.manager.service.tube.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -120,6 +123,106 @@ public class NodeService {
}
+ public TubeMQResult cloneBrokersWithTopic(CloneBrokersReq req, int
clusterId) throws Exception {
+
+ // 1. query source broker config
+ QueryBrokerCfgReq queryReq =
QueryBrokerCfgReq.getReq(req.getSourceBrokerId());
+ NodeEntry masterEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ clusterId);
+ BrokerStatusInfo brokerStatusInfo = getBrokerStatusInfo(queryReq,
masterEntry);
+
+ // 2. use source broker config to clone brokers
+ BrokerConf sourceBrokerConf = brokerStatusInfo.getData().get(0);
+ AddBrokersReq addBrokersReq = getBatchAddBrokersReq(req, clusterId,
sourceBrokerConf);
+
+ // 3. request master, return broker ids generated by master
+ AddBrokerResult addBrokerResult =
addBrokersToClusterWithId(addBrokersReq, masterEntry);
+
+ // might have duplicate brokers
+ if (addBrokerResult.getErrCode() != 0) {
+ return TubeMQResult.getErrorResult(addBrokerResult.getErrMsg());
+ }
+ List<Integer> brokerIds = getBrokerIds(addBrokerResult);
+ List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
+
+ // 4. add topic to brokers
+ return addTopicToBrokers(masterEntry, brokerIds, addTopicReqs);
+ }
+
+ private TubeMQResult addTopicToBrokers(NodeEntry masterEntry,
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+ TubeMQResult tubeResult = new TubeMQResult();
+ AddTopicsResult addTopicsResult = new AddTopicsResult();
+
+ if (CollectionUtils.isEmpty(addTopicReqs)) {
+ return tubeResult;
+ }
+ addTopicReqs.forEach(addTopicReq -> {
+ try {
+ String brokerStr = StringUtils.join(brokerIds, ",");
+ addTopicReq.setBrokerId(brokerStr);
+ TubeMQResult result = addTopicToBrokers(addTopicReq,
masterEntry);
+ if (result.getErrCode() == 0) {
+
addTopicsResult.getSuccessTopics().add(addTopicReq.getTopicName());
+ } else {
+
addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
+ }
+ } catch (Exception e) {
+ log.error("add topic to brokers fail with exception", e);
+
addTopicsResult.getFailTopics().add(addTopicReq.getTopicName());
+ }
+ });
+
+ tubeResult.setData(gson.toJson(addTopicsResult));
+ return tubeResult;
+ }
+
+ private List<Integer> getBrokerIds(AddBrokerResult addBrokerResult) {
+ List<IpIdRelation> ipids = addBrokerResult.getData();
+ List<Integer> brokerIds = Lists.newArrayList();
+ for (IpIdRelation ipid : ipids) {
+ brokerIds.add(ipid.getId());
+ }
+ return brokerIds;
+ }
+
+ private AddBrokersReq getBatchAddBrokersReq(CloneBrokersReq req, int
clusterId, BrokerConf sourceBrokerConf) {
+ AddBrokersReq addBrokersReq =
getAddBrokerReq(req.getConfModAuthToken(), clusterId);
+
+ // generate add brokers req using given target broker ips
+ List<BrokerConf> brokerConfs = Lists.newArrayList();
+ req.getTargetIps().forEach(ip -> {
+ BrokerConf brokerConf = new BrokerConf(sourceBrokerConf);
+ brokerConf.setBrokerIp(ip);
+ brokerConf.setBrokerId(0);
+ brokerConfs.add(brokerConf);
+ });
+ addBrokersReq.setBrokerJsonSet(brokerConfs);
+ return addBrokersReq;
+ }
+
+ private BrokerStatusInfo getBrokerStatusInfo(QueryBrokerCfgReq queryReq,
NodeEntry masterEntry) throws Exception {
+ String url = SCHEMA + masterEntry.getIp() + ":" +
masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" +
convertReqToQueryStr(queryReq);
+ BrokerStatusInfo brokerStatusInfo = gson.fromJson(queryMaster(url),
+ BrokerStatusInfo.class);
+ return brokerStatusInfo;
+ }
+
+ public TubeMQResult addTopicToBrokers(AddTopicReq req, NodeEntry
masterEntry) throws Exception {
+ String url = SCHEMA + masterEntry.getIp() + ":" +
masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ return requestMaster(url);
+ }
+
+
+ public TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry
masterEntry) throws Exception {
+ String url = SCHEMA + masterEntry.getIp() + ":" +
masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ TubeMQResult tubeMQResult = requestMaster(url);
+ return tubeMQResult;
+ }
+
+
private boolean configBrokersForTopics(NodeEntry nodeEntry,
Set<String> topics, List<Integer> brokerList, int maxBrokers) {
List<Integer> finalBrokerList = brokerList.subList(0, maxBrokers);
@@ -296,4 +399,18 @@ public class NodeService {
public void close() throws IOException {
httpclient.close();
}
+
+ public AddBrokerResult addBrokersToClusterWithId(AddBrokersReq req,
NodeEntry masterEntry) throws Exception {
+
+ String url = SCHEMA + masterEntry.getIp() + ":" +
masterEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+ HttpGet httpget = new HttpGet(url);
+ try (CloseableHttpResponse response = httpclient.execute(httpget)) {
+ return gson.fromJson(new
InputStreamReader(response.getEntity().getContent()),
+ AddBrokerResult.class);
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ }
+ return null;
+ }
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index d1d4949..71be4ec 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -27,4 +27,9 @@ public class TubeMQHttpConst {
"/webapi.htm?type=op_modify&method=admin_add_new_topic_record";
public static final String RELOAD_BROKER =
"/webapi.htm?type=op_modify&method=admin_reload_broker_configure";
+ public static final String QUERY_BROKER_CONFIG =
"admin_query_broker_configure";
+ public static final String OP_QUERY = "op_query";
+ public static final String OP_MODIFY = "op_modify";
+ public static final String BATCH_ADD_BROKER =
"admin_bath_add_broker_configure";
+ public static final String WEB_API = "webapi";
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
similarity index 69%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
index 13d10c4..f43074a 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddBrokerResult.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,32 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
import lombok.Data;
import java.util.List;
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
- public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
+public class AddBrokerResult {
+ private int code;
+ private String errMsg;
+ private int errCode;
+ private List<IpIdRelation> data;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
similarity index 69%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
index 13d10c4..6cd3b01 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/AddTopicsResult.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,32 +15,17 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
+import com.google.common.collect.Lists;
import lombok.Data;
import java.util.List;
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
- public String type;
+public class AddTopicsResult {
- public List<BrokerConf> brokerJsonSet;
+ public List<String> failTopics = Lists.newArrayList();
+ public List<String> successTopics = Lists.newArrayList();
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
similarity index 58%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
index fc48d47..5dc908c 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerConf.java
@@ -15,12 +15,9 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
-
-
+package org.apache.tubemq.manager.service.tube;
import lombok.Data;
-import java.security.SecureRandom;
@Data
public class BrokerConf {
@@ -41,6 +38,28 @@ public class BrokerConf {
public Integer memCacheMsgCntInK;
public Integer memCacheMsgSizeInMB;
public Integer memCacheFlushIntegervl;
+ public String deletePolicy;
+
+ public BrokerConf(BrokerConf other) {
+ this.brokerIp = other.brokerIp;
+ this.brokerPort = other.brokerPort;
+ this.brokerId = other.brokerId;
+ this.deleteWhen = other.deleteWhen;
+ this.numPartitions = other.numPartitions;
+ this.unflushThreshold = other.unflushThreshold;
+ this.unflushIntegererval = other.unflushIntegererval;
+ this.unflushDataHold = other.unflushDataHold;
+ this.acceptPublish = other.acceptPublish;
+ this.acceptSubscribe = other.acceptSubscribe;
+ this.createUser = other.createUser;
+ this.brokerTLSPort = other.brokerTLSPort;
+ this.numTopicStores = other.numTopicStores;
+ this.memCacheMsgCntInK = other.memCacheMsgCntInK;
+ this.memCacheMsgSizeInMB = other.memCacheMsgSizeInMB;
+ this.memCacheFlushIntegervl = other.memCacheFlushIntegervl;
+ this.deletePolicy = other.deletePolicy;
+ }
+
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
similarity index 69%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
index 13d10c4..11e806e 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/BrokerStatusInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,32 +15,16 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
-
+package org.apache.tubemq.manager.service.tube;
import lombok.Data;
import java.util.List;
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
- public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
+public class BrokerStatusInfo {
+ public int code;
+ public String errMsg;
+ // total broker configuration info list of brokers.
+ public List<BrokerConf> data;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
similarity index 67%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
index 13d10c4..963b79e 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/IpIdRelation.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,32 +15,13 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.service.tube;
import lombok.Data;
-import java.util.List;
-
@Data
-public class AddBrokersReq {
-
- public String confModAuthToken;
-
- public String createUser;
-
- public int clusterId;
-
- /**
- * admin_bath_add_broker_configure
- */
- public String method;
-
- /**
- * op_modify
- */
- public String type;
-
- public List<BrokerConf> brokerJsonSet;
-
+public class IpIdRelation {
+ public String ip;
+ public Integer id;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
similarity index 80%
rename from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
rename to
tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
index fc48d47..5debb19 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerCfgInfo.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,33 +15,29 @@
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.node.request;
+package org.apache.tubemq.manager.service.tube;
-import lombok.Data;
-
-import java.security.SecureRandom;
-
-@Data
-public class BrokerConf {
-
- public String brokerIp;
- public Integer brokerPort;
- public Integer brokerId;
- public String deleteWhen;
- public Integer numPartitions;
- public Integer unflushThreshold;
- public Integer unflushIntegererval;
- public Integer unflushDataHold;
+public class TubeHttpBrokerCfgInfo {
public boolean acceptPublish;
public boolean acceptSubscribe;
- public String createUser;
+ public Integer brokerId;
+ public String brokerIp;
+ public Integer brokerPort;
public Integer brokerTLSPort;
- public Integer numTopicStores;
+ public String createDate;
+ public String createUser;
+ public String deletePolicy;
+ public String deleteWhen;
+ public String modifyDate;
+ public String modifyUser;
+ public Integer memCacheFlushIntvl;
public Integer memCacheMsgCntInK;
public Integer memCacheMsgSizeInMB;
- public Integer memCacheFlushIntegervl;
-
+ public Integer numPartitions;
+ public Integer numTopicStores;
+ public Integer unflushDataHold;
+ public Integer unflushInterval;
+ public Integer unflushThreshold;
+ public boolean hasTLSPort;
}
-
-
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
index 9a36de9..4da3f04 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpClusterInfoList.java
@@ -28,7 +28,7 @@ import java.util.Map;
@Data
public class TubeHttpClusterInfoList extends TubeMQResult {
- private List<ClusterData> data = new ArrayList<>();
+ private List<ClusterData> clusterData = new ArrayList<>();
@Data
@AllArgsConstructor
@@ -63,7 +63,7 @@ public class TubeHttpClusterInfoList extends TubeMQResult {
ClusterData.ClusterInfo singleClusterInfo =
getSingleClusterInfo(entries);
ClusterData clusterData =
new ClusterData(id,
entries.get(0).getClusterName(), singleClusterInfo);
- clusterInfoList.getData().add(clusterData);
+ clusterInfoList.getClusterData().add(clusterData);
}
);
return clusterInfoList;
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
index 126b347..f93819f 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -40,6 +40,7 @@ public class ConvertUtils {
Object o = field.get(req);
String value;
// convert list to json string
+ if (o == null) continue;
if (o instanceof List) {
value = gson.toJson(o);
} else {
diff --git
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
index befb362..d967af3 100644
---
a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
+++
b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebBrokerDefConfHandler.java
@@ -18,6 +18,8 @@
package org.apache.tubemq.server.master.web.handler;
import static java.lang.Math.abs;
+
+import com.google.common.collect.Maps;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@@ -353,6 +355,9 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
HashMap<String, BdbBrokerConfEntity> inBrokerConfEntityMap = new
HashMap<>();
ConcurrentHashMap<Integer, BdbBrokerConfEntity>
bdbBrokerConfEntityMap =
brokerConfManager.getBrokerConfStoreMap();
+ // save broker ip and id relation.
+ Map<String, Integer> brokerIpId = Maps.newHashMap();
+
for (int count = 0; count < brokerJsonArray.size(); count++) {
Map<String, String> jsonObject = brokerJsonArray.get(count);
try {
@@ -415,6 +420,7 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
final String inputKey =
strBuffer.append(brokerId).append("-")
.append(brokerIp).append("-").append(brokerPort).toString();
strBuffer.delete(0, strBuffer.length());
+ brokerIpId.put(brokerIp, brokerId);
final String deleteWhen =
WebParameterUtils.validDecodeStringParameter("deleteWhen",
jsonObject.get("deleteWhen"),
TServerConstants.CFG_DELETEWHEN_MAX_LENGTH, false, "0 0 6,18 * * ?");
@@ -494,7 +500,19 @@ public class WebBrokerDefConfHandler extends
AbstractWebHandler {
for (BdbBrokerConfEntity brokerConfEntity :
inBrokerConfEntityMap.values()) {
brokerConfManager.confAddBrokerDefaultConfig(brokerConfEntity);
}
-
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}");
+
strBuffer.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"");
+
+ if (brokerIpId.size() > 0) {
+ // add ip and id relation
+ strBuffer.append(",\"data\":[");
+ brokerIpId.forEach((ip, id) -> {
+ strBuffer.append("{\"ip\":").append(ip);
+ strBuffer.append(",\"id\":").append(id).append("},");
+ });
+ strBuffer.deleteCharAt(strBuffer.length() - 1).append("]");
+ }
+ strBuffer.append("}");
+
} catch (Exception e) {
strBuffer.delete(0, strBuffer.length());
strBuffer.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"")