This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
new 1feb259 [TUBEMQ-487] add new feature clone topic + batch add topics
(#375)
1feb259 is described below
commit 1feb25978ca2e69e38a12bebd245fed68a18bdf5
Author: EMsnap <[email protected]>
AuthorDate: Tue Dec 29 02:38:42 2020 -0800
[TUBEMQ-487] add new feature clone topic + batch add topics (#375)
* [TUBEMQ-487] add new feature clone topic + batch add topics
* [TUBEMQ-487] make sure brokers are configurable
---
.../controller/node/request/AddTopicReq.java | 2 +-
.../{AddTopicReq.java => BatchAddTopicReq.java} | 20 ++---
.../{AddTopicReq.java => CloneTopicReq.java} | 20 ++---
...opicController.java => TopicTdmController.java} | 2 +-
.../controller/topic/TopicWebController.java | 100 +++++++++++++++++++++
.../apache/tubemq/manager/service/NodeService.java | 34 ++++++-
.../tubemq/manager/service/TubeMQHttpConst.java | 1 +
.../service/tube/TubeHttpBrokerInfoList.java | 10 +++
.../service/tube/TubeHttpTopicInfoList.java | 38 ++++++++
.../manager/controller/TestBusinessController.java | 4 +-
10 files changed, 196 insertions(+), 35 deletions(-)
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 3eb73ab..1753be6 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -27,7 +27,6 @@ public class AddTopicReq extends BaseReq {
public Integer unflushThreshold;
public Boolean acceptPublish;
public Integer numPartitions;
- public String deletePolicy;
public Integer unflushInterval;
public Boolean acceptSubscribe;
public String method;
@@ -35,4 +34,5 @@ public class AddTopicReq extends BaseReq {
public String brokerId;
public String confModAuthToken;
public String topicName;
+ public String deletePolicy;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
similarity index 66%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
index 3eb73ab..0201643 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
@@ -17,22 +17,12 @@
package org.apache.tubemq.manager.controller.node.request;
-
+import java.util.List;
import lombok.Data;
@Data
-public class AddTopicReq extends BaseReq {
- public String createUser;
- public String deleteWhen;
- public Integer unflushThreshold;
- public Boolean acceptPublish;
- public Integer numPartitions;
- public String deletePolicy;
- public Integer unflushInterval;
- public Boolean acceptSubscribe;
- public String method;
- public String type;
- public String brokerId;
- public String confModAuthToken;
- public String topicName;
+public class BatchAddTopicReq {
+ List<AddTopicReq> addTopicReqs;
+ List<Integer> brokerIds;
+ Integer clusterId;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
similarity index 68%
copy from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
copy to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
index 3eb73ab..f577b50 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
@@ -17,22 +17,14 @@
package org.apache.tubemq.manager.controller.node.request;
-
+import java.util.List;
import lombok.Data;
@Data
-public class AddTopicReq extends BaseReq {
- public String createUser;
- public String deleteWhen;
- public Integer unflushThreshold;
- public Boolean acceptPublish;
- public Integer numPartitions;
- public String deletePolicy;
- public Integer unflushInterval;
- public Boolean acceptSubscribe;
- public String method;
- public String type;
- public String brokerId;
+public class CloneTopicReq extends BaseReq{
+ public String sourceTopicName;
+ public Integer clusterId;
+ public List<Integer> brokerId;
+ public List<String> targetTopicName;
public String confModAuthToken;
- public String topicName;
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
similarity index 99%
rename from
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
rename to
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
index 983579f..7b5deb0 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
@@ -39,7 +39,7 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(path = "/business")
@Slf4j
-public class TopicController {
+public class TopicTdmController {
@Autowired
private TopicRepository topicRepository;
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
new file mode 100644
index 0000000..005ab9a
--- /dev/null
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.controller.topic;
+
+import com.google.gson.Gson;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
+import org.apache.tubemq.manager.entry.TopicStatus;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.TopicBackendWorker;
+import org.apache.tubemq.manager.service.TopicFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/topic")
+@Slf4j
+public class TopicWebController {
+
+ @Autowired
+ private TopicRepository topicRepository;
+
+ @Autowired
+ private TopicBackendWorker topicBackendWorker;
+
+ @Autowired
+ private NodeService nodeService;
+
+ @Autowired
+ private NodeRepository nodeRepository;
+
+ public Gson gson = new Gson();
+
+ /**
+ * add topic to brokers
+ * @param req
+ * @return
+ */
+ @PostMapping("/add")
+ public TubeMQResult addTopic(@RequestBody BatchAddTopicReq req) {
+ if (req.getClusterId() == null)
+ return TubeMQResult.getErrorResult("please input clusterId");
+ NodeEntry masterEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ req.getClusterId());
+ if (masterEntry == null)
+ return TubeMQResult.getErrorResult("no such cluster");
+ return nodeService.addTopicsToBrokers(masterEntry, req.getBrokerIds(),
req.getAddTopicReqs());
+ }
+
+ /**
+ * given one topic, copy its config and clone to brokers
+ * if no broker is is provided, topics will be cloned to all brokers in
cluster
+ * @param req
+ * @return
+ * @throws Exception
+ */
+ @PostMapping("/clone")
+ public TubeMQResult cloneTopic(@RequestBody CloneTopicReq req) throws
Exception {
+ if (req.getClusterId() == null)
+ return TubeMQResult.getErrorResult("please input clusterId");
+ NodeEntry masterEntry =
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+ req.getClusterId());
+ if (masterEntry == null)
+ return TubeMQResult.getErrorResult("no such cluster");
+ return nodeService.cloneTopicToBrokers(req, masterEntry);
+ }
+
+}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index ac60aee..1b15770 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -45,10 +45,12 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
import org.apache.tubemq.manager.entry.NodeEntry;
import org.apache.tubemq.manager.repository.NodeRepository;
import org.apache.tubemq.manager.service.tube.*;
+import
org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@@ -146,10 +148,10 @@ public class NodeService {
List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
// 4. add topic to brokers
- return addTopicToBrokers(masterEntry, brokerIds, addTopicReqs);
+ return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
}
- private TubeMQResult addTopicToBrokers(NodeEntry masterEntry,
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+ public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry,
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
TubeMQResult tubeResult = new TubeMQResult();
AddTopicsResult addTopicsResult = new AddTopicsResult();
@@ -413,4 +415,32 @@ public class NodeService {
}
return null;
}
+
+ public TubeMQResult cloneTopicToBrokers(CloneTopicReq req, NodeEntry
master) throws Exception {
+
+ // 1 query topic config
+ TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master,
req.getSourceTopicName());
+
+ if (topicInfoList == null) {
+ return TubeMQResult.getErrorResult("no such topic");
+ }
+
+ // 2 if there's no specific broker ids then clone to all of the brokers
+ List<Integer> brokerId = req.getBrokerId();
+
+ if (CollectionUtils.isEmpty(brokerId)) {
+ TubeHttpBrokerInfoList brokerInfoList =
requestClusterNodeStatus(master);
+ if (brokerInfoList != null) {
+ brokerId = brokerInfoList.getConfigurableBrokerIdList();
+ }
+ }
+
+ // 3 generate add topic req
+ AddTopicReq addTopicReq = topicInfoList.getAddTopicReq(brokerId,
+ req.getTargetTopicName(), req.getConfModAuthToken());
+
+ // 4 send to master
+ return addTopicToBrokers(addTopicReq, master);
+
+ }
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 71be4ec..0fa83d6 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -32,4 +32,5 @@ public class TubeMQHttpConst {
public static final String OP_MODIFY = "op_modify";
public static final String BATCH_ADD_BROKER =
"admin_bath_add_broker_configure";
public static final String WEB_API = "webapi";
+ public static final String BATCH_ADD_TOPIC = "admin_add_new_topic_record";
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
index c768aa1..83767ce 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -132,4 +132,14 @@ public class TubeHttpBrokerInfoList {
return tmpBrokerIdList;
}
+ public List<Integer> getAllBrokerIdList() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (data != null) {
+ for (BrokerInfo brokerInfo : data) {
+ tmpBrokerIdList.add(brokerInfo.getBrokerId());
+ }
+ }
+ return tmpBrokerIdList;
+ }
+
}
diff --git
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index 7131b83..bff5a7b 100644
---
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -17,9 +17,16 @@
package org.apache.tubemq.manager.service.tube;
+import static
org.apache.tubemq.manager.service.TubeMQHttpConst.BATCH_ADD_TOPIC;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.WEB_API;
+
import java.util.ArrayList;
import java.util.List;
import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
import
org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
/**
@@ -94,4 +101,35 @@ public class TubeHttpTopicInfoList {
}
return tmpBrokerIdList;
}
+
+ public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String>
targetTopicNames, String token) {
+
+ AddTopicReq req = new AddTopicReq();
+ TopicInfoList topicInfoList = data.get(0);
+ if (topicInfoList == null) {
+ return req;
+ }
+ List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+ if (CollectionUtils.isEmpty(topicInfos)) {
+ return req;
+ }
+
+ TopicInfo topicInfo = topicInfos.get(0);
+ String brokerStr = StringUtils.join(brokerIds, ",");
+ String topic = StringUtils.join(targetTopicNames, ",");
+
+ req.setBrokerId(brokerStr);
+ req.setTopicName(topic);
+ req.setMethod(BATCH_ADD_TOPIC);
+ req.setAcceptPublish(topicInfo.acceptPublish);
+ req.setAcceptSubscribe(topicInfo.acceptSubscribe);
+ req.setType(OP_MODIFY);
+ req.setCreateUser(WEB_API);
+ req.setDeleteWhen(topicInfo.getDeleteWhen());
+ req.setNumPartitions(topicInfo.getNumPartitions());
+ req.setUnflushInterval(topicInfo.getUnflushInterval());
+ req.setConfModAuthToken(token);
+ req.setDeletePolicy(topicInfo.getDeletePolicy());
+ return req;
+ }
}
diff --git
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 1b3c0ed..8b15f3c 100644
---
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -19,7 +19,7 @@ package org.apache.tubemq.manager.controller;
import java.net.URI;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.topic.TopicController;
+import org.apache.tubemq.manager.controller.topic.TopicTdmController;
import org.apache.tubemq.manager.entry.TopicEntry;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +59,7 @@ public class TestBusinessController {
@Before
public void setUp() {
- mvc = MockMvcBuilders.standaloneSetup(new TopicController()).build();
+ mvc = MockMvcBuilders.standaloneSetup(new
TopicTdmController()).build();
}
@Test