liuruiyiyang commented on a change in pull request #515: URL: https://github.com/apache/rocketmq-client-go/pull/515#discussion_r484729178
########## File path: examples/admin/topic/main.go ########## @@ -0,0 +1,106 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/apache/rocketmq-client-go/v2/admin" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func initAdmin() admin.Admin { + testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"}))) + if err != nil { + fmt.Println(err.Error()) + } + return testAdmin +} + +func main() { + testAdmin := initAdmin() + + topic := "newOne" + //clusterName := "DefaultCluster" + //nameSrvAddr := "127.0.0.1:9876" + brokerAddr := "127.0.0.1:10911" + + //create topic + err := testAdmin.CreateTopic( + context.Background(), + admin.WithTopicCreate(topic), + admin.WithBrokerAddrCreate(brokerAddr), + ) + if err != nil { + fmt.Println(err.Error()) + } + log.Printf("create topic to %v success", brokerAddr) + + //deletetopic + err = testAdmin.DeleteTopic( + context.Background(), + admin.WithTopicDelete(topic), + //WithBrokerAddrDelete(), + //WithClusterName(clusterName), + //WithNameSrvAddr(strings.Split(nameSrvAddr, ",")), + ) + if err != nil { + fmt.Println(err.Error()) + } + log.Printf("delete topic [%v] from Cluster success", topic) + log.Printf("delete topic [%v] from NameServer success", topic) +} + +/* +//TODO: another implementation like sarama, without brokerAddr as input(would be in admin) Review comment: It is better to remove useless code. ########## File path: api.go ########## @@ -136,3 +137,19 @@ type PullConsumer interface { func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) { return nil, errors.New("pull consumer has not supported") } + +func createTopic(opts ...admin.OptionCreate) error { Review comment: Provide a usage example or delete the useless function. ########## File path: admin/admin.go ########## @@ -0,0 +1,200 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admin + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/apache/rocketmq-client-go/v2/internal" + "github.com/apache/rocketmq-client-go/v2/internal/remote" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +type Admin interface { + CreateTopic(ctx context.Context, opts ...OptionCreate) error + //TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error) + //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) + + DeleteTopic(ctx context.Context, opts ...OptionDelete) error + + Close() error +} + +// TODO: 超时的内容, 全部转移到 ctx +type adminOptions struct { + internal.ClientOptions +} + +type AdminOption func(options *adminOptions) + +func defaultAdminOptions() *adminOptions { + opts := &adminOptions{ + ClientOptions: internal.DefaultClientOptions(), + } + opts.GroupName = "TOOLS_ADMIN" + opts.InstanceName = time.Now().String() + return opts +} + +// WithResolver nameserver resolver to fetch nameserver addr +func WithResolver(resolver primitive.NsResolver) AdminOption { + return func(options *adminOptions) { + options.Resolver = resolver + } +} + +type admin struct { + cli internal.RMQClient + namesrv internal.Namesrvs + + opts *adminOptions + + closeOnce sync.Once +} + +// NewAdmin initialize admin +func NewAdmin(opts ...AdminOption) (Admin, error) { + defaultOpts := defaultAdminOptions() + for _, opt := range opts { + opt(defaultOpts) + } + + cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil) + namesrv, err := internal.NewNamesrv(defaultOpts.Resolver) + if err != nil { + return nil, err + } + //log.Printf("Client: %#v", namesrv.srvs) + return &admin{ + cli: cli, + namesrv: namesrv, + opts: defaultOpts, + }, nil +} + +func (a *admin) getAddr(mq *primitive.MessageQueue) (string, error) { + broker := a.namesrv.FindBrokerAddrByName(mq.BrokerName) + if len(broker) == 0 { + a.namesrv.UpdateTopicRouteInfo(mq.Topic) + broker = a.namesrv.FindBrokerAddrByName(mq.BrokerName) + + if len(broker) == 0 { + return "", fmt.Errorf("broker: %s address not found", mq.BrokerName) + } + } + return broker, nil +} + +// CreateTopic create topic. +func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error { + cfg := defaultTopicConfigCreate() + for _, apply := range opts { + apply(&cfg) + } + + request := &internal.CreateTopicRequestHeader{ + Topic: cfg.Topic, + DefaultTopic: cfg.DefaultTopic, + ReadQueueNums: cfg.ReadQueueNums, + WriteQueueNums: cfg.WriteQueueNums, + Perm: cfg.Perm, + TopicFilterType: cfg.TopicFilterType, + TopicSysFlag: cfg.TopicSysFlag, + Order: cfg.Order, + } + + cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil) + _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second) + return err +} + +/* +// DeleteTopicInBroker delete topic in broker. Review comment: It is better to remove the useless code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
