This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 512b1600afb3d9d9eb6e39ae16a0db33be3cd86c
Author: EMsnap <[email protected]>
AuthorDate: Wed Dec 23 10:31:47 2020 +0800

    [TUBEMQ-485] add one or more broker in cluster
---
 .../manager/controller/node/NodeController.java    | 99 +++++++++++++++++++++-
 .../controller/node/request/AddBrokersReq.java     | 46 ++++++++++
 .../controller/node/request/BrokerConf.java        | 47 ++++++++++
 .../apache/tubemq/manager/service/NodeService.java |  8 ++
 .../apache/tubemq/manager/utils/ConvertUtils.java  | 53 ++++++++++++
 .../manager/controller/TestBusinessController.java |  2 +
 .../manager/controller/TestNodeController.java     | 49 ++++++++++-
 7 files changed, 300 insertions(+), 4 deletions(-)

diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
index fe64b7b..7b108e7 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/NodeController.java
@@ -19,12 +19,30 @@ package org.apache.tubemq.manager.controller.node;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.BrokerConf;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.tube.TubeHttpResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.SCHEMA;
+import static 
org.apache.tubemq.manager.utils.ConvertUtils.convertReqToQueryStr;
+
 @RestController
 @RequestMapping(path = "/v1/node")
 @Slf4j
@@ -33,12 +51,17 @@ public class NodeController {
     public static final String NO_SUCH_METHOD = "no such method";
     public static final String OP_QUERY = "op_query";
     public static final String ADMIN_QUERY_CLUSTER_INFO = 
"admin_query_cluster_info";
-
+    public static final String TUBE_REQUEST_PATH = "webapi.htm";
     private final Gson gson = new Gson();
+    private static final CloseableHttpClient httpclient = 
HttpClients.createDefault();
 
     @Autowired
     NodeService nodeService;
 
+    @Autowired
+    NodeRepository nodeRepository;
+
+
     @RequestMapping(value = "/query", method = RequestMethod.GET,
             produces = MediaType.APPLICATION_JSON_VALUE)
     public @ResponseBody String queryInfo(@RequestParam String type, 
@RequestParam String method,
@@ -51,4 +74,78 @@ public class NodeController {
         return gson.toJson(TubeMQResult.getErrorResult(NO_SUCH_METHOD));
     }
 
+
+    /**
+     * add brokers to cluster, need to check token and
+     * make sure user has authorization to modify it.
+     */
+    @RequestMapping(value = "/add", method = RequestMethod.POST)
+    public @ResponseBody String addBrokersToCluster(
+            @RequestBody AddBrokersReq req) throws Exception {
+        String token = req.getConfModAuthToken();
+        int clusterId = req.getClusterId();
+
+        if (StringUtils.isNotBlank(token)) {
+            NodeEntry masterEntry = 
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+                    clusterId);
+            return addToMasterAndRepo(req, masterEntry);
+        } else {
+            TubeMQResult result = new TubeMQResult();
+            result.setErrCode(-1);
+            result.setResult(false);
+            result.setErrMsg("token is not correct");
+            return gson.toJson(result);
+        }
+
+    }
+
+    private String addToMasterAndRepo(AddBrokersReq req, NodeEntry 
masterEntry) throws Exception {
+
+        String url = SCHEMA + masterEntry.getIp() + ":" + 
masterEntry.getWebPort()
+                + "/" + TUBE_REQUEST_PATH + "?" + convertReqToQueryStr(req);
+
+        log.info("start to request {}", url);
+        HttpGet httpGet = new HttpGet(url);
+        TubeMQResult defaultResult = new TubeMQResult();
+
+        try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+            TubeHttpResponse result =
+                    gson.fromJson(new 
InputStreamReader(response.getEntity().getContent()),
+                            TubeHttpResponse.class);
+            if (result.getCode() == 0 && result.getErrCode() == 0) {
+                // save brokers to db when success
+                saveAllBrokers(req.getBrokerJsonSet(), req.getClusterId(), 
masterEntry);
+            } else {
+                return result.getErrMsg();
+            }
+        } catch (Exception ex) {
+            log.error("exception caught while requesting broker status", ex);
+            defaultResult.setErrCode(-1);
+            defaultResult.setResult(false);
+            defaultResult.setErrMsg(ex.getMessage());
+        }
+
+        return gson.toJson(defaultResult);
+    }
+
+
+
+
+    private void saveAllBrokers(List<BrokerConf> brokerConfList, int 
clusterId, NodeEntry masterEntry) {
+        List<NodeEntry> nodeEntries = new ArrayList<>();
+        for (BrokerConf brokerConf : brokerConfList) {
+            NodeEntry node = new NodeEntry();
+            node.setBroker(true);
+            node.setClusterId(clusterId);
+            node.setClusterName(masterEntry.getClusterName());
+            node.setBrokerId(brokerConf.getBrokerId());
+            node.setMaster(false);
+            node.setIp(brokerConf.getBrokerIp());
+            node.setStandby(false);
+            node.setPort(brokerConf.getBrokerPort());
+            nodeEntries.add(node);
+        }
+        nodeService.saveNodes(nodeEntries);
+    }
+
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
new file mode 100644
index 0000000..13d10c4
--- /dev/null
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddBrokersReq.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.node.request;
+
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class AddBrokersReq {
+
+    public String confModAuthToken;
+
+    public String createUser;
+
+    public int clusterId;
+
+    /**
+     * admin_bath_add_broker_configure
+     */
+    public String method;
+
+    /**
+     * op_modify
+     */
+    public String type;
+
+    public List<BrokerConf> brokerJsonSet;
+
+}
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
new file mode 100644
index 0000000..fc48d47
--- /dev/null
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BrokerConf.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.node.request;
+
+
+import lombok.Data;
+
+import java.security.SecureRandom;
+
+@Data
+public class BrokerConf {
+
+    public String brokerIp;
+    public Integer brokerPort;
+    public Integer brokerId;
+    public String deleteWhen;
+    public Integer numPartitions;
+    public Integer unflushThreshold;
+    public Integer unflushIntegererval;
+    public Integer unflushDataHold;
+    public boolean acceptPublish;
+    public boolean acceptSubscribe;
+    public String createUser;
+    public Integer brokerTLSPort;
+    public Integer numTopicStores;
+    public Integer memCacheMsgCntInK;
+    public Integer memCacheMsgSizeInMB;
+    public Integer memCacheFlushIntegervl;
+
+}
+
+
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index 2e819cc..a9d39b3 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -292,6 +292,14 @@ public class NodeService {
     }
 
 
+
+    public void saveNodes(List<NodeEntry> nodes) {
+        nodeRepository.saveAll(nodes);
+    }
+
+
+
+
     public void close() throws IOException {
         httpclient.close();
     }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
new file mode 100644
index 0000000..126b347
--- /dev/null
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/utils/ConvertUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.utils;
+
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+
+import java.lang.reflect.Field;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class ConvertUtils {
+
+    public static Gson gson = new Gson();
+
+    public static String convertReqToQueryStr(Object req) throws Exception {
+        List<String> queryList = new ArrayList<>();
+        Class<?> clz = req.getClass();
+        Field[] fields = clz.getDeclaredFields();
+        for (Field field : fields) {
+            field.setAccessible(true);
+            Object o = field.get(req);
+            String value;
+            // convert list to json string
+            if (o instanceof List) {
+                value = gson.toJson(o);
+            } else {
+                value = o.toString();
+            }
+            queryList.add(field.getName() + "=" + URLEncoder.encode(
+                    value, UTF_8.toString()));
+        }
+        return StringUtils.join(queryList, "&");
+    }
+}
diff --git 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 0dbec60..1b3c0ed 100644
--- 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -96,4 +96,6 @@ public class TestBusinessController {
         
assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1);
         assertTrue(responseEntity.getBody().getErrMsg().contains("exception 
for test"));
     }
+
+
 }
diff --git 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
index 0b8958e..935bb62 100644
--- 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
+++ 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestNodeController.java
@@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import 
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.MediaType;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.MvcResult;
@@ -37,6 +38,7 @@ import org.springframework.test.web.servlet.RequestBuilder;
 import java.util.List;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static 
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
@@ -94,6 +96,14 @@ public class TestNodeController {
                     "\"standby\":[],\"broker\":[]}}]," +
                     "\"errMsg\":\"\",\"errCode\":0,\"result\":true}";
 
+    private NodeEntry getNodeEntry() {
+        NodeEntry nodeEntry = new NodeEntry();
+        nodeEntry.setMaster(true);
+        nodeEntry.setIp("127.0.0.1");
+        nodeEntry.setWebPort(8084);
+        return nodeEntry;
+    }
+
     @Test
     public void testClusterInfo() throws Exception {
         List<NodeEntry> nodeEntries = getOneNodeEntry();
@@ -123,6 +133,39 @@ public class TestNodeController {
                 result.getResponse().getContentType());
     }
 
-
-
-}
+    @Test
+    public void testAddBrokersToCluster() throws Exception {
+        String jsonStr = "{\n" +
+                "\t\"confModAuthToken\": \"abc\",\n" +
+                "\t\"createUser\": \"test\",\n" +
+                "\t\"clusterId\": 1,\n" +
+                "\t\"method\": \"admin_bath_add_broker_configure\",\n" +
+                "\t\"type\": \"op_modify\",\n" +
+                "\t\"brokerJsonSet\": [{\n" +
+                "\t\t\"brokerId\": 234,\n" +
+                "\t\t\"brokerIp\": \"127.0 .0 .1\",\n" +
+                "\t\t\"brokerPort\": 8124,\n" +
+                "\t\t\"numPartitions\": 3,\n" +
+                "\t\t\"unflushThreshold\": 55,\n" +
+                "\t\t\"unflushInterval\": 10000,\n" +
+                "\t\t\"deleteWhen\": \"0 0 6,18 * * ?\",\n" +
+                "\t\t\"deletePolicy\": \"delete,168\",\n" +
+                "\t\t\"acceptPublish\": \"true\",\n" +
+                "\t\t\"acceptSubscribe\": \"true\",\n" +
+                "\t\t\"createUser\": \"gosonzhang\",\n" +
+                "\t\t\"createDate\": \"20151116142135\",\n" +
+                "\t\t\"modifyUser\": \"gosonzhang\",\n" +
+                "\t\t\"modifyDate\": \"20151117161515\"\n" +
+                "\t}]\n" +
+                "\n" +
+                "}";
+        NodeEntry nodeEntry = getNodeEntry();
+        
doReturn(nodeEntry).when(nodeRepository).findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class));
+        RequestBuilder request = post("/v1/node/add")
+                .contentType(MediaType.APPLICATION_JSON).content(jsonStr);
+        MvcResult result = mockMvc.perform(request).andReturn();
+        String resultStr = result.getResponse().getContentAsString();
+        log.info("result json string is {}, response type is {}", resultStr,
+                result.getResponse().getContentType());
+        }
+    }

Reply via email to