This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 5c69de9b540150de1d40a58363805fc259dffceb Author: EMsnap <[email protected]> AuthorDate: Thu Dec 24 11:56:39 2020 +0800 [TUBEMQ-478] delete\reload\online brokers in cluster --- .../tubemq/manager/controller/TubeMQResult.java | 2 +- .../controller/cluster/ClusterController.java | 53 ++--------- .../manager/controller/node/NodeController.java | 82 +++++++--------- .../apache/tubemq/manager/service/NodeService.java | 7 -- .../apache/tubemq/manager/utils/MasterUtils.java | 103 +++++++++++++++++++++ .../manager/controller/TestNodeController.java | 50 +--------- 6 files changed, 149 insertions(+), 148 deletions(-) diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java index 939d0a8..6e4a970 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeMQResult.java @@ -32,7 +32,7 @@ public class TubeMQResult { private boolean result = true; public static TubeMQResult getErrorResult(String errorMsg) { - return TubeMQResult.builder().errCode(1) + return TubeMQResult.builder().errCode(-1) .errMsg(errorMsg).result(false).build(); } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java index 53e1aa3..4b58313 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java @@ -17,24 +17,17 @@ package org.apache.tubemq.manager.controller.cluster; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA; +import static org.apache.tubemq.manager.utils.MasterUtils.*; import com.google.gson.Gson; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.apache.tubemq.manager.controller.TubeMQResult; import org.apache.tubemq.manager.entry.NodeEntry; import org.apache.tubemq.manager.repository.NodeRepository; +import org.apache.tubemq.manager.utils.MasterUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestBody; @@ -49,52 +42,18 @@ import org.springframework.web.bind.annotation.RestController; @Slf4j public class ClusterController { - private final CloseableHttpClient httpclient = HttpClients.createDefault(); private final Gson gson = new Gson(); - - private static final String TUBE_REQUEST_PATH = "webapi.htm"; - @Autowired private NodeRepository nodeRepository; - - private String covertMapToQueryString(Map<String, String> requestMap) throws Exception { - List<String> queryList = new ArrayList<>(); - - for (Map.Entry<String, String> entry : requestMap.entrySet()) { - queryList.add(entry.getKey() + "=" + URLEncoder.encode( - entry.getValue(), UTF_8.toString())); - } - return StringUtils.join(queryList, "&"); - } - - private String queryMaster(String url) { - log.info("start to request {}", url); - HttpGet httpGet = new HttpGet(url); - TubeMQResult defaultResult = new TubeMQResult(); - try (CloseableHttpResponse response = httpclient.execute(httpGet)) { - // return result json to response - return EntityUtils.toString(response.getEntity()); - } catch (Exception ex) { - log.error("exception caught while requesting broker status", ex); - defaultResult.setErrCode(-1); - defaultResult.setResult(false); - defaultResult.setErrMsg(ex.getMessage()); - } - return gson.toJson(defaultResult); - } + @Autowired + public MasterUtils masterUtil; @RequestMapping(value = "/query", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody String queryInfo( @RequestParam Map<String, String> queryBody) throws Exception { - int clusterId = Integer.parseInt(queryBody.get("clusterId")); - queryBody.remove("clusterId"); - NodeEntry nodeEntry = - nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId); - String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() - + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody); - return queryMaster(url); + return gson.toJson(masterUtil.redirectToMaster(queryBody)); } /** @@ -114,7 +73,7 @@ public class ClusterController { clusterId); String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(requestBody); - return queryMaster(url); + return gson.toJson(requestMaster(url)); } else { TubeMQResult result = new TubeMQResult(); result.setErrCode(-1); diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java index 7b108e7..8a6e586 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java @@ -20,28 +20,27 @@ package org.apache.tubemq.manager.controller.node; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.apache.tubemq.manager.controller.TubeMQResult; import org.apache.tubemq.manager.controller.node.request.AddBrokersReq; import org.apache.tubemq.manager.controller.node.request.BrokerConf; import org.apache.tubemq.manager.entry.NodeEntry; import org.apache.tubemq.manager.repository.NodeRepository; import org.apache.tubemq.manager.service.NodeService; -import org.apache.tubemq.manager.service.tube.TubeHttpResponse; +import org.apache.tubemq.manager.utils.MasterUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; -import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult; import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA; import static org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr; +import static org.apache.tubemq.manager.utils.MasterUtils.*; @RestController @RequestMapping(path = "/v1/node") @@ -51,7 +50,6 @@ public class NodeController { public static final String NO_SUCH_METHOD = "no such method"; public static final String OP_QUERY = "op_query"; public static final String ADMIN_QUERY_CLUSTER_INFO = "admin_query_cluster_info"; - public static final String TUBE_REQUEST_PATH = "webapi.htm"; private final Gson gson = new Gson(); private static final CloseableHttpClient httpclient = HttpClients.createDefault(); @@ -61,6 +59,8 @@ public class NodeController { @Autowired NodeRepository nodeRepository; + @Autowired + MasterUtils masterUtil; @RequestMapping(value = "/query", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE) @@ -71,7 +71,7 @@ public class NodeController { return nodeService.queryClusterInfo(clusterId); } - return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD)); + return gson.toJson(getErrorResult(NO_SUCH_METHOD)); } @@ -88,7 +88,8 @@ public class NodeController { if (StringUtils.isNotBlank(token)) { NodeEntry masterEntry = nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue( clusterId); - return addToMasterAndRepo(req, masterEntry); + TubeMQResult result = addBrokersToCluster(req, masterEntry); + return gson.toJson(result); } else { TubeMQResult result = new TubeMQResult(); result.setErrCode(-1); @@ -99,53 +100,38 @@ public class NodeController { } - private String addToMasterAndRepo(AddBrokersReq req, NodeEntry masterEntry) throws Exception { - String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() - + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req); + /** + * add brokers to cluster, need to check token and + * make sure user has authorization to modify it. + */ + @RequestMapping(value = "/online", method = RequestMethod.GET) + public @ResponseBody String onlineBrokers( + @RequestParam Map<String, String> queryBody) throws Exception { + return gson.toJson(masterUtil.redirectToMaster(queryBody)); + } - log.info("start to request {}", url); - HttpGet httpGet = new HttpGet(url); - TubeMQResult defaultResult = new TubeMQResult(); - - try (CloseableHttpResponse response = httpclient.execute(httpGet)) { - TubeHttpResponse result = - gson.fromJson(new InputStreamReader(response.getEntity().getContent()), - TubeHttpResponse.class); - if (result.getCode() == 0 && result.getErrCode() == 0) { - // save brokers to db when success - saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), masterEntry); - } else { - return result.getErrMsg(); - } - } catch (Exception ex) { - log.error("exception caught while requesting broker status", ex); - defaultResult.setErrCode(-1); - defaultResult.setResult(false); - defaultResult.setErrMsg(ex.getMessage()); - } - return gson.toJson(defaultResult); + @RequestMapping(value = "/reload", method = RequestMethod.GET) + public @ResponseBody String reloadBrokers( + @RequestParam Map<String, String> queryBody) throws Exception { + return gson.toJson(masterUtil.redirectToMaster(queryBody)); } + @RequestMapping(value = "/delete", method = RequestMethod.GET) + public @ResponseBody String deleteBrokers( + @RequestParam Map<String, String> queryBody) throws Exception { + TubeMQResult result = masterUtil.redirectToMaster(queryBody); + return gson.toJson(result); + } - - private void saveAllBrokers(List<BrokerConf> brokerConfList, int clusterId, NodeEntry masterEntry) { - List<NodeEntry> nodeEntries = new ArrayList<>(); - for (BrokerConf brokerConf : brokerConfList) { - NodeEntry node = new NodeEntry(); - node.setBroker(true); - node.setClusterId(clusterId); - node.setClusterName(masterEntry.getClusterName()); - node.setBrokerId(brokerConf.getBrokerId()); - node.setMaster(false); - node.setIp(brokerConf.getBrokerIp()); - node.setStandby(false); - node.setPort(brokerConf.getBrokerPort()); - nodeEntries.add(node); - } - nodeService.saveNodes(nodeEntries); + private TubeMQResult addBrokersToCluster(AddBrokersReq req, NodeEntry masterEntry) throws Exception { + String url = SCHEMA + masterEntry.getIp() + ":" + masterEntry.getWebPort() + + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req); + TubeMQResult tubeMQResult = requestMaster(url); + return tubeMQResult; } + } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java index a9d39b3..836fe8a 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java @@ -293,13 +293,6 @@ public class NodeService { - public void saveNodes(List<NodeEntry> nodes) { - nodeRepository.saveAll(nodes); - } - - - - public void close() throws IOException { httpclient.close(); } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java new file mode 100644 index 0000000..95def0c --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/MasterUtils.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tubemq.manager.utils; + +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.tubemq.manager.controller.TubeMQResult; +import org.apache.tubemq.manager.entry.NodeEntry; +import org.apache.tubemq.manager.repository.NodeRepository; +import org.apache.tubemq.manager.service.tube.TubeHttpResponse; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.InputStreamReader; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.tubemq.manager.controller.TubeMQResult.getErrorResult; +import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA; + + +@Slf4j +@Component +public class MasterUtils { + + private static CloseableHttpClient httpclient = HttpClients.createDefault(); + private static Gson gson = new Gson(); + public static final String TUBE_REQUEST_PATH = "webapi.htm"; + + @Autowired + NodeRepository nodeRepository; + + public static String covertMapToQueryString(Map<String, String> requestMap) throws Exception { + List<String> queryList = new ArrayList<>(); + + for (Map.Entry<String, String> entry : requestMap.entrySet()) { + queryList.add(entry.getKey() + "=" + URLEncoder.encode( + entry.getValue(), UTF_8.toString())); + } + return StringUtils.join(queryList, "&"); + } + + + + public static TubeMQResult requestMaster(String url) throws Exception { + + log.info("start to request {}", url); + HttpGet httpGet = new HttpGet(url); + TubeMQResult defaultResult = new TubeMQResult(); + + try (CloseableHttpResponse response = httpclient.execute(httpGet)) { + TubeHttpResponse tubeResponse = + gson.fromJson(new InputStreamReader(response.getEntity().getContent()), + TubeHttpResponse.class); + if (tubeResponse.getCode() == 0 && tubeResponse.getErrCode() == 0) { + return defaultResult; + } else { + defaultResult = getErrorResult(tubeResponse.getErrMsg()); + } + } catch (Exception ex) { + log.error("exception caught while requesting broker status", ex); + defaultResult = getErrorResult(ex.getMessage()); + } + return defaultResult; + } + + + + + public TubeMQResult redirectToMaster(Map<String, String> queryBody) throws Exception { + int clusterId = Integer.parseInt(queryBody.get("clusterId")); + queryBody.remove("clusterId"); + NodeEntry nodeEntry = + nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId); + String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort() + + "/" + TUBE_REQUEST_PATH + "?" + covertMapToQueryString(queryBody); + return requestMaster(url); + } +} diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java index 935bb62..9b8ce64 100644 --- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java @@ -30,13 +30,17 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.RequestBuilder; import java.util.List; +import java.util.Objects; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -96,14 +100,6 @@ public class TestNodeController { "\"standby\":[],\"broker\":[]}}]," + "\"errMsg\":\"\",\"errCode\":0,\"result\":true}"; - private NodeEntry getNodeEntry() { - NodeEntry nodeEntry = new NodeEntry(); - nodeEntry.setMaster(true); - nodeEntry.setIp("127.0.0.1"); - nodeEntry.setWebPort(8084); - return nodeEntry; - } - @Test public void testClusterInfo() throws Exception { List<NodeEntry> nodeEntries = getOneNodeEntry(); @@ -132,40 +128,4 @@ public class TestNodeController { log.info("result json string is {}, response type is {}", resultStr, result.getResponse().getContentType()); } - - @Test - public void testAddBrokersToCluster() throws Exception { - String jsonStr = "{\n" + - "\t\"confModAuthToken\": \"abc\",\n" + - "\t\"createUser\": \"test\",\n" + - "\t\"clusterId\": 1,\n" + - "\t\"method\": \"admin_bath_add_broker_configure\",\n" + - "\t\"type\": \"op_modify\",\n" + - "\t\"brokerJsonSet\": [{\n" + - "\t\t\"brokerId\": 234,\n" + - "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" + - "\t\t\"brokerPort\": 8124,\n" + - "\t\t\"numPartitions\": 3,\n" + - "\t\t\"unflushThreshold\": 55,\n" + - "\t\t\"unflushInterval\": 10000,\n" + - "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" + - "\t\t\"deletePolicy\": \"delete,168\",\n" + - "\t\t\"acceptPublish\": \"true\",\n" + - "\t\t\"acceptSubscribe\": \"true\",\n" + - "\t\t\"createUser\": \"gosonzhang\",\n" + - "\t\t\"createDate\": \"20151116142135\",\n" + - "\t\t\"modifyUser\": \"gosonzhang\",\n" + - "\t\t\"modifyDate\": \"20151117161515\"\n" + - "\t}]\n" + - "\n" + - "}"; - NodeEntry nodeEntry = getNodeEntry(); - doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)); - RequestBuilder request = post("/v1/node/add") - .contentType(MediaType.APPLICATION_JSON).content(jsonStr); - MvcResult result = mockMvc.perform(request).andReturn(); - String resultStr = result.getResponse().getContentAsString(); - log.info("result json string is {}, response type is {}", resultStr, - result.getResponse().getContentType()); - } - } +}
