This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ab6584b [ISSUE #1205]Exposing cluster from admin (#1221) ab6584b is described below commit ab6584b3b3f95e050d4ef99c84ab3c3c9e80d059 Author: takagi <82871248+takagi...@users.noreply.github.com> AuthorDate: Tue Sep 9 19:20:10 2025 +0800 [ISSUE #1205]Exposing cluster from admin (#1221) Co-authored-by: weilin <zhaoxuyao....@alibaba-inc.com> --- admin/admin.go | 5 +++++ internal/mock_namesrv.go | 15 +++++++++++++++ internal/namesrv.go | 2 ++ internal/route.go | 16 ++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/admin/admin.go b/admin/admin.go index a92a25c..04a08a5 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -38,6 +38,7 @@ type Admin interface { FetchAllTopicList(ctx context.Context) (*TopicList, error) //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) + FetchClusterList(topic string) ([]string, error) Close() error } @@ -276,6 +277,10 @@ func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([] return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic)) } +func (a *admin) FetchClusterList(topic string) ([]string, error) { + return a.cli.GetNameSrv().FetchClusterList(topic) +} + func (a *admin) Close() error { a.closeOnce.Do(func() { a.cli.Shutdown() diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go index 7ce6f97..abfbf3e 100644 --- a/internal/mock_namesrv.go +++ b/internal/mock_namesrv.go @@ -191,3 +191,18 @@ func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList)) } + +// FetchClusterList mocks base method +func (m *MockNamesrvs) FetchClusterList(topic string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchClusterList", topic) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchClusterList indicates an expected call of FetchClusterList +func (mr *MockNamesrvsMockRecorder) FetchClusterList(topic interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchClusterList", reflect.TypeOf((*MockNamesrvs)(nil).FetchClusterList), topic) +} diff --git a/internal/namesrv.go b/internal/namesrv.go index c347998..ac7643d 100644 --- a/internal/namesrv.go +++ b/internal/namesrv.go @@ -62,6 +62,8 @@ type Namesrvs interface { FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) + FetchClusterList(topic string) ([]string, error) + AddrList() []string } diff --git a/internal/route.go b/internal/route.go index 56f764f..4af8619 100644 --- a/internal/route.go +++ b/internal/route.go @@ -530,6 +530,22 @@ func (s *namesrvs) routeData2PublishInfo(topic string, data *TopicRouteData) *To return publishInfo } +func (s *namesrvs) FetchClusterList(topic string) ([]string, error) { + routeData, err := s.queryTopicRouteInfoFromServer(topic) + if err != nil { + return nil, err + } + clusterSet := make(map[string]struct{}) + for _, bd := range routeData.BrokerDataList { + clusterSet[bd.Cluster] = struct{}{} + } + clusterList := make([]string, 0, len(clusterSet)) + for cluster := range clusterSet { + clusterList = append(clusterList, cluster) + } + return clusterList, nil +} + // TopicRouteData TopicRouteData type TopicRouteData struct { OrderTopicConf string