This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 33798bc  [ISSUE #371]feat: clean extra code for internal packagec 
(#372)
33798bc is described below

commit 33798bc30375afecaf1605cfdd8efee9527fb5f4
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Jan 7 11:56:18 2020 +0800

    [ISSUE #371]feat: clean extra code for internal packagec (#372)
    
    * feat: clean extra code for internal packagec
    
    Closes #371
    
    * fix
---
 consumer/push_consumer.go |  1 +
 internal/client.go        | 17 +++--------------
 internal/mock_client.go   | 23 +++++++++++------------
 internal/route.go         | 11 -----------
 internal/validators.go    |  4 ----
 producer/producer.go      |  1 +
 producer/producer_test.go |  1 +
 7 files changed, 17 insertions(+), 41 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1add8dd..0c7f224 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -212,6 +212,7 @@ func (pc *pushConsumer) Shutdown() error {
                pc.lockTicker.Stop()
                close(pc.done)
 
+               pc.client.UnregisterConsumer(pc.consumerGroup)
                err = pc.defaultConsumer.shutdown()
        })
 
diff --git a/internal/client.go b/internal/client.go
index acc25ea..ca8cc88 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -131,6 +131,7 @@ type RMQClient interface {
        ClientID() string
 
        RegisterProducer(group string, producer InnerProducer)
+       UnregisterProducer(group string)
        InvokeSync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
                timeoutMillis time.Duration) (*remote.RemotingCommand, error)
        InvokeAsync(ctx context.Context, addr string, request 
*remote.RemotingCommand,
@@ -146,7 +147,6 @@ type RMQClient interface {
        RegisterConsumer(group string, consumer InnerConsumer) error
        UnregisterConsumer(group string)
        PullMessage(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader) (*primitive.PullResult, error)
-       PullMessageAsync(ctx context.Context, brokerAddrs string, request 
*PullMessageRequestHeader, f func(result *primitive.PullResult)) error
        RebalanceImmediately()
        UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
@@ -600,11 +600,6 @@ func (c *rmqClient) decodeCommandCustomHeader(pr 
*primitive.PullResult, cmd *rem
        }
 }
 
-// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, 
request *PullMessageRequestHeader, f func(result *primitive.PullResult)) error {
-       return nil
-}
-
 func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) 
error {
        _, exist := c.consumerMap.Load(group)
        if exist {
@@ -618,6 +613,7 @@ func (c *rmqClient) RegisterConsumer(group string, consumer 
InnerConsumer) error
 }
 
 func (c *rmqClient) UnregisterConsumer(group string) {
+       c.consumerMap.Delete(group)
 }
 
 func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
@@ -625,14 +621,7 @@ func (c *rmqClient) RegisterProducer(group string, 
producer InnerProducer) {
 }
 
 func (c *rmqClient) UnregisterProducer(group string) {
-}
-
-func (c *rmqClient) SelectProducer(group string) InnerProducer {
-       return nil
-}
-
-func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
-       return nil
+       c.producerMap.Delete(group)
 }
 
 func (c *rmqClient) RebalanceImmediately() {
diff --git a/internal/mock_client.go b/internal/mock_client.go
index d01a51f..d8ed7b2 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
+
 // Code generated by MockGen. DO NOT EDIT.
 // Source: client.go
 
@@ -268,6 +269,16 @@ func (mr *MockRMQClientMockRecorder) 
RegisterProducer(group, producer interface{
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), 
group, producer)
 }
 
+// UnregisterProducer mocks base method
+func (m *MockRMQClient) UnregisterProducer(group string) {
+       m.ctrl.Call(m, "UnregisterProducer", group)
+}
+
+// UnregisterProducer indicates an expected call of UnregisterProducer
+func (mr *MockRMQClientMockRecorder) UnregisterProducer(group interface{}) 
*gomock.Call {
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnregisterProducer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterProducer), 
group)
+}
+
 // InvokeSync mocks base method
 func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request 
*remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, 
error) {
        ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
@@ -387,18 +398,6 @@ func (mr *MockRMQClientMockRecorder) PullMessage(ctx, 
brokerAddrs, request inter
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", 
reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
 }
 
-// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs 
string, request *PullMessageRequestHeader, f func(*primitive.PullResult)) error 
{
-       ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
-       ret0, _ := ret[0].(error)
-       return ret0
-}
-
-// PullMessageAsync indicates an expected call of PullMessageAsync
-func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, 
request, f interface{}) *gomock.Call {
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), 
ctx, brokerAddrs, request, f)
-}
-
 // RebalanceImmediately mocks base method
 func (m *MockRMQClient) RebalanceImmediately() {
        m.ctrl.Call(m, "RebalanceImmediately")
diff --git a/internal/route.go b/internal/route.go
index f4d4116..c5e771e 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -113,7 +113,6 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 }
 
 func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
-       // Todo process lock timeout
        s.lockNamesrv.Lock()
        defer s.lockNamesrv.Unlock()
 
@@ -258,16 +257,6 @@ func (s *namesrvs) FetchSubscribeMessageQueues(topic 
string) ([]*primitive.Messa
        return mqs, nil
 }
 
-func (s *namesrvs) FindMQByTopic(topic string) *primitive.MessageQueue {
-       mqs, err := s.FetchPublishMessageQueues(topic)
-       if err != nil {
-               return nil
-       }
-       r := rand.New(rand.NewSource(time.Now().UnixNano()))
-       i := utils.AbsInt(r.Int())
-       return mqs[i%len(mqs)]
-}
-
 func (s *namesrvs) FetchPublishMessageQueues(topic string) 
([]*primitive.MessageQueue, error) {
        var (
                err       error
diff --git a/internal/validators.go b/internal/validators.go
index 7753942..e693fde 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -37,10 +37,6 @@ func ValidateGroup(group string) {
                rlog.Fatal("consumerGroup is empty", nil)
        }
 
-       //if !_Pattern.Match([]byte(group)) {
-       //      rlog.Fatalf("the specified group[%s] contains illegal 
characters, allowing only %s", group, _ValidPattern)
-       //}
-
        if len(group) > _CharacterMaxLength {
                rlog.Fatal("the specified group is longer than group max length 
255.", nil)
        }
diff --git a/producer/producer.go b/producer/producer.go
index 8e0661f..a47d76f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -87,6 +87,7 @@ func (p *defaultProducer) Start() error {
 
 func (p *defaultProducer) Shutdown() error {
        atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+       p.client.UnregisterProducer(p.group)
        p.client.Shutdown()
        return nil
 }
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 387e9b0..e1273bb 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -51,6 +51,7 @@ func TestShutdown(t *testing.T) {
        assert.Nil(t, err)
 
        client.EXPECT().Shutdown().Return()
+       client.EXPECT().UnregisterProducer(gomock.Any()).Return()
        err = p.Shutdown()
        assert.Nil(t, err)
 

Reply via email to