This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf97e1  [ISSUE #514] feature: add admintool, createTopic & 
deleteTopic (#515)
9cf97e1 is described below

commit 9cf97e115816a37c3426552df9601f50d3637639
Author: Shuyang Wu <[email protected]>
AuthorDate: Mon Oct 19 20:09:55 2020 +0800

    [ISSUE #514] feature: add admintool, createTopic & deleteTopic (#515)
    
    * feature: createTopic & deleteTopic, fix #514, fix #502
    
    * update deleteTopic to adapt sarama
    
    * move test to example; would mock and test in new PR
    
    * fix typo
    
    * rm commented code
    
    * update intro.md
    
    * rm commented code
    
    * rm unused code
    
    * add info/error to rlog
---
 admin/admin.go               | 202 +++++++++++++++++++++++++++++++++++++++++++
 admin/option.go              | 131 ++++++++++++++++++++++++++++
 docs/Introduction.md         |  25 ++++++
 docs/feature.md              |  15 +++-
 examples/admin/topic/main.go |  64 ++++++++++++++
 internal/request.go          | 127 +++++++++++++++++++++++----
 6 files changed, 544 insertions(+), 20 deletions(-)

diff --git a/admin/admin.go b/admin/admin.go
new file mode 100644
index 0000000..f45f39a
--- /dev/null
+++ b/admin/admin.go
@@ -0,0 +1,202 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2/internal"
+       "github.com/apache/rocketmq-client-go/v2/internal/remote"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+type Admin interface {
+       CreateTopic(ctx context.Context, opts ...OptionCreate) error
+       DeleteTopic(ctx context.Context, opts ...OptionDelete) error
+       //TODO
+       //TopicList(ctx context.Context, mq *primitive.MessageQueue) 
(*remote.RemotingCommand, error)
+       //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, 
error)
+       Close() error
+}
+
+// TODO: move outdated context to ctx
+type adminOptions struct {
+       internal.ClientOptions
+}
+
+type AdminOption func(options *adminOptions)
+
+func defaultAdminOptions() *adminOptions {
+       opts := &adminOptions{
+               ClientOptions: internal.DefaultClientOptions(),
+       }
+       opts.GroupName = "TOOLS_ADMIN"
+       opts.InstanceName = time.Now().String()
+       return opts
+}
+
+// WithResolver nameserver resolver to fetch nameserver addr
+func WithResolver(resolver primitive.NsResolver) AdminOption {
+       return func(options *adminOptions) {
+               options.Resolver = resolver
+       }
+}
+
+type admin struct {
+       cli     internal.RMQClient
+       namesrv internal.Namesrvs
+
+       opts *adminOptions
+
+       closeOnce sync.Once
+}
+
+// NewAdmin initialize admin
+func NewAdmin(opts ...AdminOption) (Admin, error) {
+       defaultOpts := defaultAdminOptions()
+       for _, opt := range opts {
+               opt(defaultOpts)
+       }
+
+       cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+       namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+       if err != nil {
+               return nil, err
+       }
+       //log.Printf("Client: %#v", namesrv.srvs)
+       return &admin{
+               cli:     cli,
+               namesrv: namesrv,
+               opts:    defaultOpts,
+       }, nil
+}
+
+// CreateTopic create topic.
+// TODO: another implementation like sarama, without brokerAddr as input
+func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
+       cfg := defaultTopicConfigCreate()
+       for _, apply := range opts {
+               apply(&cfg)
+       }
+
+       request := &internal.CreateTopicRequestHeader{
+               Topic:           cfg.Topic,
+               DefaultTopic:    cfg.DefaultTopic,
+               ReadQueueNums:   cfg.ReadQueueNums,
+               WriteQueueNums:  cfg.WriteQueueNums,
+               Perm:            cfg.Perm,
+               TopicFilterType: cfg.TopicFilterType,
+               TopicSysFlag:    cfg.TopicSysFlag,
+               Order:           cfg.Order,
+       }
+
+       cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
+       _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
+       if err != nil {
+               rlog.Error("create topic error", map[string]interface{}{
+                       rlog.LogKeyTopic:         cfg.Topic,
+                       rlog.LogKeyBroker:        cfg.BrokerAddr,
+                       rlog.LogKeyUnderlayError: err,
+               })
+       } else {
+               rlog.Info("create topic success", map[string]interface{}{
+                       rlog.LogKeyTopic:  cfg.Topic,
+                       rlog.LogKeyBroker: cfg.BrokerAddr,
+               })
+       }
+       return err
+}
+
+// DeleteTopicInBroker delete topic in broker.
+func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, 
brokerAddr string) (*remote.RemotingCommand, error) {
+       request := &internal.DeleteTopicRequestHeader{
+               Topic: topic,
+       }
+
+       cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, 
request, nil)
+       return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopicInNameServer delete topic in nameserver.
+func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, 
nameSrvAddr string) (*remote.RemotingCommand, error) {
+       request := &internal.DeleteTopicRequestHeader{
+               Topic: topic,
+       }
+
+       cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, 
request, nil)
+       return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopic delete topic in both broker and nameserver.
+func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
+       cfg := defaultTopicConfigDelete()
+       for _, apply := range opts {
+               apply(&cfg)
+       }
+       //delete topic in broker
+       if cfg.BrokerAddr == "" {
+               a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+               cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+       }
+
+       if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err 
!= nil {
+               if err != nil {
+                       rlog.Error("delete topic in broker error", 
map[string]interface{}{
+                               rlog.LogKeyTopic:         cfg.Topic,
+                               rlog.LogKeyBroker:        cfg.BrokerAddr,
+                               rlog.LogKeyUnderlayError: err,
+                       })
+               }
+               return err
+       }
+
+       //delete topic in nameserver
+       if len(cfg.NameSrvAddr) == 0 {
+               a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+               cfg.NameSrvAddr = a.namesrv.AddrList()
+       }
+
+       for _, nameSrvAddr := range cfg.NameSrvAddr {
+               if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, 
nameSrvAddr); err != nil {
+                       if err != nil {
+                               rlog.Error("delete topic in name server error", 
map[string]interface{}{
+                                       rlog.LogKeyTopic:         cfg.Topic,
+                                       "nameServer":             nameSrvAddr,
+                                       rlog.LogKeyUnderlayError: err,
+                               })
+                       }
+                       return err
+               }
+       }
+       rlog.Info("delete topic success", map[string]interface{}{
+               rlog.LogKeyTopic:  cfg.Topic,
+               rlog.LogKeyBroker: cfg.BrokerAddr,
+               "nameServer":      cfg.NameSrvAddr,
+       })
+       return nil
+}
+
+func (a *admin) Close() error {
+       a.closeOnce.Do(func() {
+               a.cli.Shutdown()
+       })
+       return nil
+}
diff --git a/admin/option.go b/admin/option.go
new file mode 100644
index 0000000..d5a648e
--- /dev/null
+++ b/admin/option.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+func defaultTopicConfigCreate() TopicConfigCreate {
+       opts := TopicConfigCreate{
+               DefaultTopic:    "defaultTopic",
+               ReadQueueNums:   8,
+               WriteQueueNums:  8,
+               Perm:            6,
+               TopicFilterType: "SINGLE_TAG",
+               TopicSysFlag:    0,
+               Order:           false,
+       }
+       return opts
+}
+
+type TopicConfigCreate struct {
+       Topic           string
+       BrokerAddr      string
+       DefaultTopic    string
+       ReadQueueNums   int
+       WriteQueueNums  int
+       Perm            int
+       TopicFilterType string
+       TopicSysFlag    int
+       Order           bool
+}
+
+type OptionCreate func(*TopicConfigCreate)
+
+func WithTopicCreate(Topic string) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.Topic = Topic
+       }
+}
+
+func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.BrokerAddr = BrokerAddr
+       }
+}
+
+func WithReadQueueNums(ReadQueueNums int) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.ReadQueueNums = ReadQueueNums
+       }
+}
+
+func WithWriteQueueNums(WriteQueueNums int) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.WriteQueueNums = WriteQueueNums
+       }
+}
+
+func WithPerm(Perm int) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.Perm = Perm
+       }
+}
+
+func WithTopicFilterType(TopicFilterType string) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.TopicFilterType = TopicFilterType
+       }
+}
+
+func WithTopicSysFlag(TopicSysFlag int) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.TopicSysFlag = TopicSysFlag
+       }
+}
+
+func WithOrder(Order bool) OptionCreate {
+       return func(opts *TopicConfigCreate) {
+               opts.Order = Order
+       }
+}
+
+func defaultTopicConfigDelete() TopicConfigDelete {
+       opts := TopicConfigDelete{}
+       return opts
+}
+
+type TopicConfigDelete struct {
+       Topic       string
+       ClusterName string
+       NameSrvAddr []string
+       BrokerAddr  string
+}
+
+type OptionDelete func(*TopicConfigDelete)
+
+func WithTopicDelete(Topic string) OptionDelete {
+       return func(opts *TopicConfigDelete) {
+               opts.Topic = Topic
+       }
+}
+
+func WithBrokerAddrDelete(BrokerAddr string) OptionDelete {
+       return func(opts *TopicConfigDelete) {
+               opts.BrokerAddr = BrokerAddr
+       }
+}
+
+func WithClusterName(ClusterName string) OptionDelete {
+       return func(opts *TopicConfigDelete) {
+               opts.ClusterName = ClusterName
+       }
+}
+
+func WithNameSrvAddr(NameSrvAddr []string) OptionDelete {
+       return func(opts *TopicConfigDelete) {
+               opts.NameSrvAddr = NameSrvAddr
+       }
+}
diff --git a/docs/Introduction.md b/docs/Introduction.md
index f5b8e12..2132b91 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -101,3 +101,28 @@ err = c.Start()
 ```
 
 Full examples: [consumer](../examples/consumer)
+
+
+### Admin: Topic Operation
+
+#### Examples
+- create topic
+```
+testAdmin, err := 
admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
+err = testAdmin.CreateTopic(
+       context.Background(),
+       admin.WithTopicCreate("newTopic"),
+       admin.WithBrokerAddrCreate("127.0.0.1:10911"),
+)
+```
+
+- delete topic
+`ClusterName` not supported yet
+```
+err = testAdmin.DeleteTopic(
+       context.Background(),
+       admin.WithTopicDelete("newTopic"),
+       //admin.WithBrokerAddrDelete("127.0.0.1:10911"),        //optional
+       //admin.WithNameSrvAddr(nameSrvAddr),                           
//optional
+)
+```
\ No newline at end of file
diff --git a/docs/feature.md b/docs/feature.md
index 795ffac..eed3e17 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -70,4 +70,17 @@
 - [ ] Other
     - [ ] VIPChannel
     - [ ] RPCHook
-    
\ No newline at end of file
+    
+## Admin
+
+### Topic/Cluster
+- [x] updateTopic
+- [x] deleteTopic
+- [ ] updateSubGroup
+- [ ] deleteSubGroup
+- [ ] updateBrokerConfig
+- [ ] updateTopicPerm
+- [ ] listTopic
+- [ ] topicRoute
+- [ ] topicStatus
+- [ ] topicClusterList
\ No newline at end of file
diff --git a/examples/admin/topic/main.go b/examples/admin/topic/main.go
new file mode 100644
index 0000000..ef9a536
--- /dev/null
+++ b/examples/admin/topic/main.go
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/rocketmq-client-go/v2/admin"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+       topic := "newOne"
+       //clusterName := "DefaultCluster"
+       nameSrvAddr := []string{"127.0.0.1:9876"}
+       brokerAddr := "127.0.0.1:10911"
+
+       testAdmin, err := 
admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+
+       //create topic
+       err = testAdmin.CreateTopic(
+               context.Background(),
+               admin.WithTopicCreate(topic),
+               admin.WithBrokerAddrCreate(brokerAddr),
+       )
+       if err != nil {
+               fmt.Println("Create topic error:", err.Error())
+       }
+
+       //deletetopic
+       err = testAdmin.DeleteTopic(
+               context.Background(),
+               admin.WithTopicDelete(topic),
+               //admin.WithBrokerAddrDelete(brokerAddr),
+               //admin.WithNameSrvAddr(nameSrvAddr),
+       )
+       if err != nil {
+               fmt.Println("Delete topic error:", err.Error())
+       }
+
+       err = testAdmin.Close()
+       if err != nil {
+               fmt.Printf("Shutdown admin error: %s", err.Error())
+       }
+}
diff --git a/internal/request.go b/internal/request.go
index 5438790..fa88efe 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -24,25 +24,33 @@ import (
 )
 
 const (
-       ReqSendMessage              = int16(10)
-       ReqPullMessage              = int16(11)
-       ReqQueryConsumerOffset      = int16(14)
-       ReqUpdateConsumerOffset     = int16(15)
-       ReqSearchOffsetByTimestamp  = int16(29)
-       ReqGetMaxOffset             = int16(30)
-       ReqHeartBeat                = int16(34)
-       ReqConsumerSendMsgBack      = int16(36)
-       ReqENDTransaction           = int16(37)
-       ReqGetConsumerListByGroup   = int16(38)
-       ReqLockBatchMQ              = int16(41)
-       ReqUnlockBatchMQ            = int16(42)
-       ReqGetRouteInfoByTopic      = int16(105)
-       ReqSendBatchMessage         = int16(320)
-       ReqCheckTransactionState    = int16(39)
-       ReqNotifyConsumerIdsChanged = int16(40)
-       ReqResetConsuemrOffset      = int16(220)
-       ReqGetConsumerRunningInfo   = int16(307)
-       ReqConsumeMessageDirectly   = int16(309)
+       ReqSendMessage                   = int16(10)
+       ReqPullMessage                   = int16(11)
+       ReqQueryMessage                  = int16(12)
+       ReqQueryConsumerOffset           = int16(14)
+       ReqUpdateConsumerOffset          = int16(15)
+       ReqCreateTopic                   = int16(17)
+       ReqSearchOffsetByTimestamp       = int16(29)
+       ReqGetMaxOffset                  = int16(30)
+       ReqGetMinOffset                  = int16(31)
+       ReqViewMessageByID               = int16(33)
+       ReqHeartBeat                     = int16(34)
+       ReqConsumerSendMsgBack           = int16(36)
+       ReqENDTransaction                = int16(37)
+       ReqGetConsumerListByGroup        = int16(38)
+       ReqLockBatchMQ                   = int16(41)
+       ReqUnlockBatchMQ                 = int16(42)
+       ReqGetRouteInfoByTopic           = int16(105)
+       ReqGetBrokerClusterInfo          = int16(106)
+       ReqSendBatchMessage              = int16(320)
+       ReqCheckTransactionState         = int16(39)
+       ReqNotifyConsumerIdsChanged      = int16(40)
+       ReqGetAllTopicListFromNameServer = int16(206)
+       ReqDeleteTopicInBroker           = int16(215)
+       ReqDeleteTopicInNameSrv          = int16(216)
+       ReqResetConsuemrOffset           = int16(220)
+       ReqGetConsumerRunningInfo        = int16(307)
+       ReqConsumeMessageDirectly        = int16(309)
 )
 
 type SendMessageRequestHeader struct {
@@ -318,3 +326,84 @@ func (request *GetConsumerRunningInfoHeader) 
Decode(properties map[string]string
                request.clientID = v
        }
 }
+
+type QueryMessageRequestHeader struct {
+       Topic          string
+       Key            string
+       MaxNum         int
+       BeginTimestamp int64
+       EndTimestamp   int64
+}
+
+func (request *QueryMessageRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+       maps["key"] = request.Key
+       maps["maxNum"] = fmt.Sprintf("%d", request.MaxNum)
+       maps["beginTimestamp"] = strconv.FormatInt(request.BeginTimestamp, 10)
+       maps["endTimestamp"] = fmt.Sprintf("%d", request.EndTimestamp)
+
+       return maps
+}
+
+func (request *QueryMessageRequestHeader) Decode(properties map[string]string) 
error {
+       return nil
+}
+
+type ViewMessageRequestHeader struct {
+       Offset int64
+}
+
+func (request *ViewMessageRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["offset"] = strconv.FormatInt(request.Offset, 10)
+
+       return maps
+}
+
+type CreateTopicRequestHeader struct {
+       Topic           string
+       DefaultTopic    string
+       ReadQueueNums   int
+       WriteQueueNums  int
+       Perm            int
+       TopicFilterType string
+       TopicSysFlag    int
+       Order           bool
+}
+
+func (request *CreateTopicRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+       maps["defaultTopic"] = request.DefaultTopic
+       maps["readQueueNums"] = fmt.Sprintf("%d", request.ReadQueueNums)
+       maps["writeQueueNums"] = fmt.Sprintf("%d", request.WriteQueueNums)
+       maps["perm"] = fmt.Sprintf("%d", request.Perm)
+       maps["topicFilterType"] = request.TopicFilterType
+       maps["topicSysFlag"] = fmt.Sprintf("%d", request.TopicSysFlag)
+       maps["order"] = strconv.FormatBool(request.Order)
+
+       return maps
+}
+
+type TopicListRequestHeader struct {
+       Topic string
+}
+
+func (request *TopicListRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+
+       return maps
+}
+
+type DeleteTopicRequestHeader struct {
+       Topic string
+}
+
+func (request *DeleteTopicRequestHeader) Encode() map[string]string {
+       maps := make(map[string]string)
+       maps["topic"] = request.Topic
+
+       return maps
+}

Reply via email to