This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 512b1600afb3d9d9eb6e39ae16a0db33be3cd86c Author: EMsnap <[email protected]> AuthorDate: Wed Dec 23 10:31:47 2020 +0800 [TUBEMQ-485] add one or more broker in cluster --- .../manager/controller/node/NodeController.java | 99 +++++++++++++++++++++- .../controller/node/request/AddBrokersReq.java | 46 ++++++++++ .../controller/node/request/BrokerConf.java | 47 ++++++++++ .../apache/tubemq/manager/service/NodeService.java | 8 ++ .../apache/tubemq/manager/utils/ConvertUtils.java | 53 ++++++++++++ .../manager/controller/TestBusinessController.java | 2 + .../manager/controller/TestNodeController.java | 49 ++++++++++- 7 files changed, 300 insertions(+), 4 deletions(-) diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java index fe64b7b..7b108e7 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java @@ -19,12 +19,30 @@ package org.apache.tubemq.manager.controller.node; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.apache.tubemq.manager.controller.TubeMQResult; +import org.apache.tubemq.manager.controller.node.request.AddBrokersReq; +import org.apache.tubemq.manager.controller.node.request.BrokerConf; +import org.apache.tubemq.manager.entry.NodeEntry; +import org.apache.tubemq.manager.repository.NodeRepository; import org.apache.tubemq.manager.service.NodeService; +import org.apache.tubemq.manager.service.tube.TubeHttpResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA; +import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr; + @RestController @RequestMapping(path = "/v1/node") @Slf4j @@ -33,12 +51,17 @@ public class NodeController { public static final String NO_SUCH_METHOD = "no such method"; public static final String OP_QUERY = "op_query"; public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info"; - + public static final String TUBE_REQUEST_PATH = "webapi.htm"; private final Gson gson = new Gson(); + private static final CloseableHttpClient httpclient = HttpClients.createDefault(); @Autowired NodeService nodeService; + @Autowired + NodeRepository nodeRepository; + + @RequestMapping(value = "/query", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody String queryInfo(@RequestParam String type, @RequestParam String method, @@ -51,4 +74,78 @@ public class NodeController { return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD)); } + + /** + * add brokers to cluster, need to check token and + * make sure user has authorization to modify it. + */ + @RequestMapping(value = "/add", method = RequestMethod.POST) + public @ResponseBody String addBrokersToCluster( + @RequestBody AddBrokersReq req) throws Exception { + String token = req.getConfModAuthToken(); + int clusterId = req.getClusterId(); + + if (StringUtils.isNotBlank(token)) { + NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue( + clusterId); + return addToMasterAndRepo(req, masterEntry); + } else { + TubeMQResult result = new TubeMQResult(); + result.setErrCode(-1); + result.setResult(false); + result.setErrMsg("token is not correct"); + return gson.toJson(result); + } + + } + + private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception { + + String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() + + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req); + + log.info("start to request {}", url); + HttpGet httpGet = new HttpGet(url); + TubeMQResult defaultResult = new TubeMQResult(); + + try (CloseableHttpResponse response = httpclient.execute(httpGet)) { + TubeHttpResponse result = + gson.fromJson(new InputStreamReader(response.getEntity().getContent()), + TubeHttpResponse.class); + if (result.getCode() == 0 && result.getErrCode() == 0) { + // save brokers to db when success + saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry); + } else { + return result.getErrMsg(); + } + } catch (Exception ex) { + log.error("exception caught while requesting broker status", ex); + defaultResult.setErrCode(-1); + defaultResult.setResult(false); + defaultResult.setErrMsg(ex.getMessage()); + } + + return gson.toJson(defaultResult); + } + + + + + private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) { + List<NodeEntry> nodeEntries = new ArrayList<>(); + for (BrokerConf brokerConf : brokerConfList) { + NodeEntry node = new NodeEntry(); + node.setBroker(true); + node.setClusterId(clusterId); + node.setClusterName(masterEntry.getClusterName()); + node.setBrokerId(brokerConf.getBrokerId()); + node.setMaster(false); + node.setIp(brokerConf.getBrokerIp()); + node.setStandby(false); + node.setPort(brokerConf.getBrokerPort()); + nodeEntries.add(node); + } + nodeService.saveNodes(nodeEntries); + } + } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java new file mode 100644 index 0000000..13d10c4 --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.controller.node.request; + + +import lombok.Data; + +import java.util.List; + +@Data +public class AddBrokersReq { + + public String confModAuthToken; + + public String createUser; + + public int clusterId; + + /** + * admin_bath_add_broker_configure + */ + public String method; + + /** + * op_modify + */ + public String type; + + public List<BrokerConf> brokerJsonSet; + +} diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java new file mode 100644 index 0000000..fc48d47 --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.controller.node.request; + + +import lombok.Data; + +import java.security.SecureRandom; + +@Data +public class BrokerConf { + + public String brokerIp; + public Integer brokerPort; + public Integer brokerId; + public String deleteWhen; + public Integer numPartitions; + public Integer unflushThreshold; + public Integer unflushIntegererval; + public Integer unflushDataHold; + public boolean acceptPublish; + public boolean acceptSubscribe; + public String createUser; + public Integer brokerTLSPort; + public Integer numTopicStores; + public Integer memCacheMsgCntInK; + public Integer memCacheMsgSizeInMB; + public Integer memCacheFlushIntegervl; + +} + + diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java index 2e819cc..a9d39b3 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java @@ -292,6 +292,14 @@ public class NodeService { } + + public void saveNodes(List<NodeEntry> nodes) { + nodeRepository.saveAll(nodes); + } + + + + public void close() throws IOException { httpclient.close(); } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java new file mode 100644 index 0000000..126b347 --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.utils; + +import com.google.gson.Gson; +import org.apache.commons.lang3.StringUtils; + +import java.lang.reflect.Field; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class ConvertUtils { + + public static Gson gson = new Gson(); + + public static String convertReqToQueryStr(Object req) throws Exception { + List<String> queryList = new ArrayList<>(); + Class<?> clz = req.getClass(); + Field[] fields = clz.getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + Object o = field.get(req); + String value; + // convert list to json string + if (o instanceof List) { + value = gson.toJson(o); + } else { + value = o.toString(); + } + queryList.add(field.getName() + "=" + URLEncoder.encode( + value, UTF_8.toString())); + } + return StringUtils.join(queryList, "&"); + } +} diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java index 0dbec60..1b3c0ed 100644 --- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java @@ -96,4 +96,6 @@ public class TestBusinessController { assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1); assertTrue(responseEntity.getBody().getErrMsg().contains("exception for test")); } + + } diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java index 0b8958e..935bb62 100644 --- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java @@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.http.MediaType; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; @@ -37,6 +38,7 @@ import org.springframework.test.web.servlet.RequestBuilder; import java.util.List; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; @@ -94,6 +96,14 @@ public class TestNodeController { "\"standby\":[],\"broker\":[]}}]," + "\"errMsg\":\"\",\"errCode\":0,\"result\":true}"; + private NodeEntry getNodeEntry() { + NodeEntry nodeEntry = new NodeEntry(); + nodeEntry.setMaster(true); + nodeEntry.setIp("127.0.0.1"); + nodeEntry.setWebPort(8084); + return nodeEntry; + } + @Test public void testClusterInfo() throws Exception { List<NodeEntry> nodeEntries = getOneNodeEntry(); @@ -123,6 +133,39 @@ public class TestNodeController { result.getResponse().getContentType()); } - - -} + @Test + public void testAddBrokersToCluster() throws Exception { + String jsonStr = "{\n" + + "\t\"confModAuthToken\": \"abc\",\n" + + "\t\"createUser\": \"test\",\n" + + "\t\"clusterId\": 1,\n" + + "\t\"method\": \"admin_bath_add_broker_configure\",\n" + + "\t\"type\": \"op_modify\",\n" + + "\t\"brokerJsonSet\": [{\n" + + "\t\t\"brokerId\": 234,\n" + + "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" + + "\t\t\"brokerPort\": 8124,\n" + + "\t\t\"numPartitions\": 3,\n" + + "\t\t\"unflushThreshold\": 55,\n" + + "\t\t\"unflushInterval\": 10000,\n" + + "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" + + "\t\t\"deletePolicy\": \"delete,168\",\n" + + "\t\t\"acceptPublish\": \"true\",\n" + + "\t\t\"acceptSubscribe\": \"true\",\n" + + "\t\t\"createUser\": \"gosonzhang\",\n" + + "\t\t\"createDate\": \"20151116142135\",\n" + + "\t\t\"modifyUser\": \"gosonzhang\",\n" + + "\t\t\"modifyDate\": \"20151117161515\"\n" + + "\t}]\n" + + "\n" + + "}"; + NodeEntry nodeEntry = getNodeEntry(); + doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)); + RequestBuilder request = post("/v1/node/add") + .contentType(MediaType.APPLICATION_JSON).content(jsonStr); + MvcResult result = mockMvc.perform(request).andReturn(); + String resultStr = result.getResponse().getContentAsString(); + log.info("result json string is {}, response type is {}", resultStr, + result.getResponse().getContentType()); + } + }
