This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-421 by this push:
     new 1feb259  [TUBEMQ-487] add new feature clone topic + batch add topics 
(#375)
1feb259 is described below

commit 1feb25978ca2e69e38a12bebd245fed68a18bdf5
Author: EMsnap <[email protected]>
AuthorDate: Tue Dec 29 02:38:42 2020 -0800

    [TUBEMQ-487] add new feature clone topic + batch add topics (#375)
    
    * [TUBEMQ-487] add new feature clone topic + batch add topics
    
    * [TUBEMQ-487] make sure brokers are configurable
---
 .../controller/node/request/AddTopicReq.java       |   2 +-
 .../{AddTopicReq.java => BatchAddTopicReq.java}    |  20 ++---
 .../{AddTopicReq.java => CloneTopicReq.java}       |  20 ++---
 ...opicController.java => TopicTdmController.java} |   2 +-
 .../controller/topic/TopicWebController.java       | 100 +++++++++++++++++++++
 .../apache/tubemq/manager/service/NodeService.java |  34 ++++++-
 .../tubemq/manager/service/TubeMQHttpConst.java    |   1 +
 .../service/tube/TubeHttpBrokerInfoList.java       |  10 +++
 .../service/tube/TubeHttpTopicInfoList.java        |  38 ++++++++
 .../manager/controller/TestBusinessController.java |   4 +-
 10 files changed, 196 insertions(+), 35 deletions(-)

diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
index 3eb73ab..1753be6 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
@@ -27,7 +27,6 @@ public class AddTopicReq extends BaseReq {
     public Integer unflushThreshold;
     public Boolean acceptPublish;
     public Integer numPartitions;
-    public String deletePolicy;
     public Integer unflushInterval;
     public Boolean acceptSubscribe;
     public String method;
@@ -35,4 +34,5 @@ public class AddTopicReq extends BaseReq {
     public String brokerId;
     public String confModAuthToken;
     public String topicName;
+    public String deletePolicy;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
similarity index 66%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
index 3eb73ab..0201643 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/BatchAddTopicReq.java
@@ -17,22 +17,12 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
+import java.util.List;
 import lombok.Data;
 
 @Data
-public class AddTopicReq extends BaseReq {
-    public String createUser;
-    public String deleteWhen;
-    public Integer unflushThreshold;
-    public Boolean acceptPublish;
-    public Integer numPartitions;
-    public String deletePolicy;
-    public Integer unflushInterval;
-    public Boolean acceptSubscribe;
-    public String method;
-    public String type;
-    public String brokerId;
-    public String confModAuthToken;
-    public String topicName;
+public class BatchAddTopicReq {
+    List<AddTopicReq> addTopicReqs;
+    List<Integer> brokerIds;
+    Integer clusterId;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
similarity index 68%
copy from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
copy to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
index 3eb73ab..f577b50 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/AddTopicReq.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/node/request/CloneTopicReq.java
@@ -17,22 +17,14 @@
 
 package org.apache.tubemq.manager.controller.node.request;
 
-
+import java.util.List;
 import lombok.Data;
 
 @Data
-public class AddTopicReq extends BaseReq {
-    public String createUser;
-    public String deleteWhen;
-    public Integer unflushThreshold;
-    public Boolean acceptPublish;
-    public Integer numPartitions;
-    public String deletePolicy;
-    public Integer unflushInterval;
-    public Boolean acceptSubscribe;
-    public String method;
-    public String type;
-    public String brokerId;
+public class CloneTopicReq extends BaseReq{
+    public String sourceTopicName;
+    public Integer clusterId;
+    public List<Integer> brokerId;
+    public List<String> targetTopicName;
     public String confModAuthToken;
-    public String topicName;
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
similarity index 99%
rename from 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
rename to 
tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
index 983579f..7b5deb0 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicController.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicTdmController.java
@@ -39,7 +39,7 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping(path = "/business")
 @Slf4j
-public class TopicController {
+public class TopicTdmController {
 
     @Autowired
     private TopicRepository topicRepository;
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
new file mode 100644
index 0000000..005ab9a
--- /dev/null
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/topic/TopicWebController.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tubemq.manager.controller.topic;
+
+import com.google.gson.Gson;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.tubemq.manager.controller.TubeMQResult;
+import org.apache.tubemq.manager.controller.node.request.BatchAddTopicReq;
+import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
+import org.apache.tubemq.manager.entry.NodeEntry;
+import org.apache.tubemq.manager.entry.TopicEntry;
+import org.apache.tubemq.manager.entry.TopicStatus;
+import org.apache.tubemq.manager.exceptions.TubeMQManagerException;
+import org.apache.tubemq.manager.repository.NodeRepository;
+import org.apache.tubemq.manager.repository.TopicRepository;
+import org.apache.tubemq.manager.service.NodeService;
+import org.apache.tubemq.manager.service.TopicBackendWorker;
+import org.apache.tubemq.manager.service.TopicFuture;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(path = "/v1/topic")
+@Slf4j
+public class TopicWebController {
+
+    @Autowired
+    private TopicRepository topicRepository;
+
+    @Autowired
+    private TopicBackendWorker topicBackendWorker;
+
+    @Autowired
+    private NodeService nodeService;
+
+    @Autowired
+    private NodeRepository nodeRepository;
+
+    public Gson gson = new Gson();
+
+    /**
+     * add topic to brokers
+     * @param req
+     * @return
+     */
+    @PostMapping("/add")
+    public TubeMQResult addTopic(@RequestBody BatchAddTopicReq req) {
+        if (req.getClusterId() == null)
+            return TubeMQResult.getErrorResult("please input clusterId");
+        NodeEntry masterEntry = 
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
+        if (masterEntry == null)
+            return TubeMQResult.getErrorResult("no such cluster");
+        return nodeService.addTopicsToBrokers(masterEntry, req.getBrokerIds(), 
req.getAddTopicReqs());
+    }
+
+    /**
+     * given one topic, copy its config and clone to brokers
+     * if no broker is is provided, topics will be cloned to all brokers in 
cluster
+     * @param req
+     * @return
+     * @throws Exception
+     */
+    @PostMapping("/clone")
+    public TubeMQResult cloneTopic(@RequestBody CloneTopicReq req) throws 
Exception {
+        if (req.getClusterId() == null)
+            return TubeMQResult.getErrorResult("please input clusterId");
+        NodeEntry masterEntry = 
nodeRepository.findNodeEntryByClusterIdIsAndMasterIsTrue(
+            req.getClusterId());
+        if (masterEntry == null)
+            return TubeMQResult.getErrorResult("no such cluster");
+        return nodeService.cloneTopicToBrokers(req, masterEntry);
+    }
+
+}
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
index ac60aee..1b15770 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/NodeService.java
@@ -45,10 +45,12 @@ import org.apache.tubemq.manager.controller.TubeMQResult;
 import org.apache.tubemq.manager.controller.node.request.AddBrokersReq;
 import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import org.apache.tubemq.manager.controller.node.request.CloneBrokersReq;
+import org.apache.tubemq.manager.controller.node.request.CloneTopicReq;
 import org.apache.tubemq.manager.controller.node.request.QueryBrokerCfgReq;
 import org.apache.tubemq.manager.entry.NodeEntry;
 import org.apache.tubemq.manager.repository.NodeRepository;
 import org.apache.tubemq.manager.service.tube.*;
+import 
org.apache.tubemq.manager.service.tube.TubeHttpBrokerInfoList.BrokerInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
@@ -146,10 +148,10 @@ public class NodeService {
         List<AddTopicReq> addTopicReqs = req.getAddTopicReqs();
 
         // 4. add topic to brokers
-        return addTopicToBrokers(masterEntry, brokerIds, addTopicReqs);
+        return addTopicsToBrokers(masterEntry, brokerIds, addTopicReqs);
     }
 
-    private TubeMQResult addTopicToBrokers(NodeEntry masterEntry, 
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
+    public TubeMQResult addTopicsToBrokers(NodeEntry masterEntry, 
List<Integer> brokerIds, List<AddTopicReq> addTopicReqs) {
         TubeMQResult tubeResult = new TubeMQResult();
         AddTopicsResult addTopicsResult = new AddTopicsResult();
 
@@ -413,4 +415,32 @@ public class NodeService {
         }
         return null;
     }
+
+    public TubeMQResult cloneTopicToBrokers(CloneTopicReq req, NodeEntry 
master) throws Exception {
+
+        // 1 query topic config
+        TubeHttpTopicInfoList topicInfoList = requestTopicConfigInfo(master, 
req.getSourceTopicName());
+
+        if (topicInfoList == null) {
+            return TubeMQResult.getErrorResult("no such topic");
+        }
+
+        // 2 if there's no specific broker ids then clone to all of the brokers
+        List<Integer> brokerId = req.getBrokerId();
+
+        if (CollectionUtils.isEmpty(brokerId)) {
+            TubeHttpBrokerInfoList brokerInfoList = 
requestClusterNodeStatus(master);
+            if (brokerInfoList != null) {
+                brokerId = brokerInfoList.getConfigurableBrokerIdList();
+            }
+        }
+
+        // 3 generate add topic req
+        AddTopicReq addTopicReq = topicInfoList.getAddTopicReq(brokerId,
+            req.getTargetTopicName(), req.getConfModAuthToken());
+
+        // 4 send to master
+        return addTopicToBrokers(addTopicReq, master);
+
+    }
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
index 71be4ec..0fa83d6 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/TubeMQHttpConst.java
@@ -32,4 +32,5 @@ public class TubeMQHttpConst {
     public static final String OP_MODIFY = "op_modify";
     public static final String BATCH_ADD_BROKER = 
"admin_bath_add_broker_configure";
     public static final String WEB_API = "webapi";
+    public static final String BATCH_ADD_TOPIC = "admin_add_new_topic_record";
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
index c768aa1..83767ce 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpBrokerInfoList.java
@@ -132,4 +132,14 @@ public class TubeHttpBrokerInfoList {
         return tmpBrokerIdList;
     }
 
+    public List<Integer> getAllBrokerIdList() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (data != null) {
+            for (BrokerInfo brokerInfo : data) {
+                tmpBrokerIdList.add(brokerInfo.getBrokerId());
+            }
+        }
+        return tmpBrokerIdList;
+    }
+
 }
diff --git 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
index 7131b83..bff5a7b 100644
--- 
a/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
+++ 
b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/tube/TubeHttpTopicInfoList.java
@@ -17,9 +17,16 @@
 
 package org.apache.tubemq.manager.service.tube;
 
+import static 
org.apache.tubemq.manager.service.TubeMQHttpConst.BATCH_ADD_TOPIC;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.OP_MODIFY;
+import static org.apache.tubemq.manager.service.TubeMQHttpConst.WEB_API;
+
 import java.util.ArrayList;
 import java.util.List;
 import lombok.Data;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tubemq.manager.controller.node.request.AddTopicReq;
 import 
org.apache.tubemq.manager.service.tube.TubeHttpTopicInfoList.TopicInfoList.TopicInfo;
 
 /**
@@ -94,4 +101,35 @@ public class TubeHttpTopicInfoList {
         }
         return tmpBrokerIdList;
     }
+
+    public AddTopicReq getAddTopicReq(List<Integer> brokerIds, List<String> 
targetTopicNames, String token) {
+
+        AddTopicReq req = new AddTopicReq();
+        TopicInfoList topicInfoList = data.get(0);
+        if (topicInfoList == null) {
+            return req;
+        }
+        List<TopicInfo> topicInfos = topicInfoList.getTopicInfo();
+        if (CollectionUtils.isEmpty(topicInfos)) {
+            return req;
+        }
+
+        TopicInfo topicInfo = topicInfos.get(0);
+        String brokerStr = StringUtils.join(brokerIds, ",");
+        String topic = StringUtils.join(targetTopicNames, ",");
+
+        req.setBrokerId(brokerStr);
+        req.setTopicName(topic);
+        req.setMethod(BATCH_ADD_TOPIC);
+        req.setAcceptPublish(topicInfo.acceptPublish);
+        req.setAcceptSubscribe(topicInfo.acceptSubscribe);
+        req.setType(OP_MODIFY);
+        req.setCreateUser(WEB_API);
+        req.setDeleteWhen(topicInfo.getDeleteWhen());
+        req.setNumPartitions(topicInfo.getNumPartitions());
+        req.setUnflushInterval(topicInfo.getUnflushInterval());
+        req.setConfModAuthToken(token);
+        req.setDeletePolicy(topicInfo.getDeletePolicy());
+        return req;
+    }
 }
diff --git 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
index 1b3c0ed..8b15f3c 100644
--- 
a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
+++ 
b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java
@@ -19,7 +19,7 @@ package org.apache.tubemq.manager.controller;
 import java.net.URI;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.tubemq.manager.controller.topic.TopicController;
+import org.apache.tubemq.manager.controller.topic.TopicTdmController;
 import org.apache.tubemq.manager.entry.TopicEntry;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,7 +59,7 @@ public class TestBusinessController {
 
     @Before
     public void setUp() {
-        mvc = MockMvcBuilders.standaloneSetup(new TopicController()).build();
+        mvc = MockMvcBuilders.standaloneSetup(new 
TopicTdmController()).build();
     }
 
     @Test

Reply via email to