This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-336
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-336 by this push:
new 9e1ea11 [TUBEMQ-392] add query rest api for clusters (#307)
9e1ea11 is described below
commit 9e1ea11f5703f52dd4074fa5a60d5d6fd1b80820
Author: Yuanbo Liu <[email protected]>
AuthorDate: Thu Nov 5 10:11:09 2020 +0800
[TUBEMQ-392] add query rest api for clusters (#307)
---
.../org/apache/tubemq/manager/TubeMQManager.java | 2 +-
.../controller/ManagerControllerAdvice.java | 21 ++--
.../{topic/TopicResult.java => TubeResult.java} | 13 +--
.../controller/cluster/ClusterController.java | 100 +++++++++++++++++
.../manager/controller/topic/TopicController.java | 23 ++--
.../manager/controller/TestBusinessController.java | 14 +--
.../manager/controller/TestClusterController.java | 121 +++++++++++++++++++++
7 files changed, 255 insertions(+), 39 deletions(-)
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
index a25897b..114c0bc 100644
--- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
+++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java
@@ -42,7 +42,7 @@ public class TubeMQManager {
@Value("${manager.async.thread.prefix:AsyncThread-}")
private String threadPrefix;
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
SpringApplication.run(TubeMQManager.class);
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
index 09a72cd..5053834 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java
@@ -16,9 +16,6 @@
*/
package org.apache.tubemq.manager.controller;
-import javax.servlet.http.HttpServletRequest;
-import org.apache.tubemq.manager.controller.topic.TopicResult;
-import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
@@ -29,18 +26,16 @@ import
org.springframework.web.bind.annotation.RestControllerAdvice;
public class ManagerControllerAdvice {
/**
- * handling business TubeMQManagerException, and return json format string.
+ * handling exception, and return json format string.
*
- * @param request - http request
- * @param ex - exception
- * @return entity
+ * @param ex
+ * @return
*/
- @ExceptionHandler(TubeMQManagerException.class)
- public TopicResult handlingBusinessException(HttpServletRequest request,
- TubeMQManagerException ex) {
- TopicResult result = new TopicResult();
- result.setMessage(ex.getMessage());
- result.setCode(-1);
+ @ExceptionHandler(Exception.class)
+ public TubeResult handlingParameterException(Exception ex) {
+ TubeResult result = new TubeResult();
+ result.setErrMsg(ex.getClass().getName() + " " + ex.getMessage());
+ result.setErrCode(-1);
return result;
}
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java
similarity index 82%
rename from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
rename to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java
index 98fb81e..144d975 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicResult.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/TubeResult.java
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.manager.controller.topic;
+
+package org.apache.tubemq.manager.controller;
import lombok.Data;
-/**
- * rest result for business controller
- */
@Data
-public class TopicResult {
- private String message;
- private int code = 0;
+public class TubeResult {
+ private String errMsg;
+ private int errCode = 0;
+ private boolean result = true;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
new file mode 100644
index 0000000..599ce15
--- /dev/null
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/cluster/ClusterController.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller.cluster;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.tubemq.manager.service.TubeHttpConst.SCHEMA;
+
+import com.google.gson.Gson;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.tubemq.manager.controller.TubeResult;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/cluster")
+@Slf4j
+public class ClusterController {
+
+ private final CloseableHttpClient httpclient = HttpClients.createDefault();
+ private final Gson gson = new Gson();
+
+ private static final String TUBE_REQUEST_PATH = "webapi.htm";
+
+ @Autowired
+ private NodeRepository nodeRepository;
+
+
+ private String covertMapToQueryString(Map<String, String> requestMap)
throws Exception {
+ List<String> queryList = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : requestMap.entrySet()) {
+ queryList.add(entry.getKey() + "=" + URLEncoder.encode(
+ entry.getValue(), UTF_8.toString()));
+ }
+ return StringUtils.join(queryList, "&");
+ }
+
+ private String queryMaster(String url) {
+ log.info("start to request {}", url);
+ HttpGet httpGet = new HttpGet(url);
+ TubeResult defaultResult = new TubeResult();
+ try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
+ // return result json to response
+ return EntityUtils.toString(response.getEntity());
+ } catch (Exception ex) {
+ log.error("exception caught while requesting broker status", ex);
+ defaultResult.setErrCode(-1);
+ defaultResult.setResult(false);
+ defaultResult.setErrMsg(ex.getMessage());
+ }
+ return gson.toJson(defaultResult);
+ }
+
+ @RequestMapping(value = "/query", method = RequestMethod.GET,
+ produces = MediaType.APPLICATION_JSON_VALUE)
+ public @ResponseBody String queryInfo(
+ @RequestParam Map<String, String> queryBody) throws Exception {
+ int clusterId = Integer.parseInt(queryBody.get("clusterId"));
+ queryBody.remove("clusterId");
+ NodeEntry nodeEntry =
+
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(clusterId);
+ String url = SCHEMA + nodeEntry.getIp() + ":" + nodeEntry.getWebPort()
+ + "/" + TUBE_REQUEST_PATH + "?" +
covertMapToQueryString(queryBody);
+ return queryMaster(url);
+ }
+
+
+}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
index 314d079..fdeac4e 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.apache.tubemq.manager.entry.TopicStatus;
import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
@@ -53,7 +54,7 @@ public class TopicController {
* @throws Exception - exception
*/
@PostMapping("/add")
- public TopicResult addTopic(@RequestBody TopicEntry entry) {
+ public TubeResult addTopic(@RequestBody TopicEntry entry) {
// entry in adding status
entry.setStatus(TopicStatus.ADDING.value());
topicRepository.saveAndFlush(entry);
@@ -68,7 +69,7 @@ public class TopicController {
}
topicRepository.saveAndFlush(entry1);
});
- return new TopicResult();
+ return new TubeResult();
}
/**
@@ -78,8 +79,8 @@ public class TopicController {
* @throws Exception
*/
@PostMapping("/update")
- public TopicResult updateTopic(@RequestBody TopicEntry entry) {
- return new TopicResult();
+ public TubeResult updateTopic(@RequestBody TopicEntry entry) {
+ return new TubeResult();
}
/**
@@ -89,10 +90,10 @@ public class TopicController {
* @throws Exception
*/
@GetMapping("/check")
- public TopicResult checkTopicByBusinessName(
+ public TubeResult checkTopicByBusinessName(
@RequestParam String businessName) {
List<TopicEntry> result =
topicRepository.findAllByBusinessName(businessName);
- return new TopicResult();
+ return new TubeResult();
}
/**
@@ -103,13 +104,13 @@ public class TopicController {
* @throws Exception
*/
@GetMapping("/get/{id}")
- public TopicResult getBusinessByID(
+ public TubeResult getBusinessByID(
@PathVariable Long id) {
Optional<TopicEntry> businessEntry = topicRepository.findById(id);
- TopicResult result = new TopicResult();
+ TubeResult result = new TubeResult();
if (!businessEntry.isPresent()) {
- result.setCode(-1);
- result.setMessage("business not found");
+ result.setErrCode(-1);
+ result.setErrMsg("business not found");
}
return result;
}
@@ -119,7 +120,7 @@ public class TopicController {
* @return
*/
@GetMapping("/throwException")
- public TopicResult throwException() {
+ public TubeResult throwException() {
throw new TubeMQManagerException("exception for test");
}
}
diff --git
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 0838203..9a497cf 100644
---
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -20,7 +20,6 @@ import java.net.URI;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.tubemq.manager.controller.topic.TopicController;
-import org.apache.tubemq.manager.controller.topic.TopicResult;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Before;
import org.junit.Test;
@@ -40,6 +39,7 @@ import org.springframework.test.web.servlet.RequestBuilder;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
import static
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@@ -82,8 +82,8 @@ public class TestBusinessController {
HttpHeaders headers = new HttpHeaders();
HttpEntity<TopicEntry> request = new HttpEntity<>(entry, headers);
- ResponseEntity<TopicResult> responseEntity =
- client.postForEntity(uri, request, TopicResult.class);
+ ResponseEntity<TubeResult> responseEntity =
+ client.postForEntity(uri, request, TubeResult.class);
assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true);
}
@@ -91,9 +91,9 @@ public class TestBusinessController {
public void testControllerException() throws Exception {
final String baseUrl = "http://localhost:" + randomServerPort +
"/business/throwException";
URI uri = new URI(baseUrl);
- ResponseEntity<TopicResult> responseEntity =
- client.getForEntity(uri, TopicResult.class);
-
assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1);
- assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception
for test");
+ ResponseEntity<TubeResult> responseEntity =
+ client.getForEntity(uri, TubeResult.class);
+
assertThat(Objects.requireNonNull(responseEntity.getBody()).getErrCode()).isEqualTo(-1);
+ assertTrue(responseEntity.getBody().getErrMsg().contains("exception
for test"));
}
}
diff --git
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
new file mode 100644
index 0000000..c31fb8d
--- /dev/null
+++
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestClusterController.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tubemq.manager.controller;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.cluster.ClusterController;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.springframework.beans.factory.annotation.Autowired;
+import
org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.MvcResult;
+import org.springframework.test.web.servlet.RequestBuilder;
+
+@Slf4j
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@AutoConfigureMockMvc
+public class TestClusterController {
+
+ private final Gson gson = new Gson();
+
+ @MockBean
+ private NodeRepository nodeRepository;
+
+ @InjectMocks
+ private ClusterController clusterController;
+
+ @Autowired
+ private MockMvc mockMvc;
+
+ private NodeEntry getNodeEntry() {
+ NodeEntry nodeEntry = new NodeEntry();
+ nodeEntry.setMaster(true);
+ nodeEntry.setIp("10.215.128.83");
+ nodeEntry.setWebPort(8080);
+ return nodeEntry;
+ }
+
+ @Test
+ public void testExceptionQuery() throws Exception {
+ NodeEntry nodeEntry = getNodeEntry();
+
when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)))
+ .thenReturn(nodeEntry);
+ RequestBuilder request = get(
+
"/v1/cluster/query?method=admin_query_topic_info&type=op_query");
+ MvcResult result = mockMvc.perform(request).andReturn();
+ String resultStr = result.getResponse().getContentAsString();
+ TubeResult clusterResult = gson.fromJson(resultStr, TubeResult.class);
+ Assert.assertEquals(-1, clusterResult.getErrCode());
+
Assert.assertTrue(clusterResult.getErrMsg().contains("NumberFormatException"));
+ }
+
+ @Test
+ public void testTopicQuery() throws Exception {
+ NodeEntry nodeEntry = getNodeEntry();
+
when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)))
+ .thenReturn(nodeEntry);
+ RequestBuilder request = get(
+
"/v1/cluster/query?method=admin_query_topic_info&type=op_query&clusterId=1");
+ MvcResult result = mockMvc.perform(request).andReturn();
+ String resultStr = result.getResponse().getContentAsString();
+ log.info("result json string is {}, response type is {}", resultStr,
+ result.getResponse().getContentType());
+ }
+
+ @Test
+ public void testBrokerQuery() throws Exception {
+ NodeEntry nodeEntry = getNodeEntry();
+
when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)))
+ .thenReturn(nodeEntry);
+ RequestBuilder request = get(
+
"/v1/cluster/query?method=admin_query_broker_run_status&type=op_query&clusterId=1&brokerIp=");
+ MvcResult result = mockMvc.perform(request).andReturn();
+ String resultStr = result.getResponse().getContentAsString();
+ log.info("result json string is {}, response type is {}", resultStr,
+ result.getResponse().getContentType());
+ }
+
+ @Test
+ public void testTopicAndGroupQuery() throws Exception {
+ NodeEntry nodeEntry = getNodeEntry();
+
when(nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(any(Integer.class)))
+ .thenReturn(nodeEntry);
+ RequestBuilder request = get(
+
"/v1/cluster/query?method=admin_query_sub_info&type=op_query&clusterId=1&topicName=test&groupName=test");
+ MvcResult result = mockMvc.perform(request).andReturn();
+ String resultStr = result.getResponse().getContentAsString();
+ log.info("result json string is {}, response type is {}", resultStr,
+ result.getResponse().getContentType());
+ }
+
+
+}