This is an automated email from the ASF dual-hosted git repository.

jianhaixu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f554daf  allow further subscription even after client starts, to align 
with Java SDK's behavior
     new 9d9e413  Merge pull request #560 from beiwei30/subscribe-after-start
f554daf is described below

commit f554daff344769434c823c9cb57e1e6f48b4c390
Author: Ian Luo <[email protected]>
AuthorDate: Tue Nov 24 19:48:25 2020 +0800

    allow further subscription even after client starts, to align with Java 
SDK's behavior
---
 consumer/push_consumer.go | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58945c2..931fefa 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -222,9 +222,11 @@ func (pc *pushConsumer) Shutdown() error {
 
 func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
        f func(context.Context, ...*primitive.MessageExt) (ConsumeResult, 
error)) error {
-       if atomic.LoadInt32(&pc.state) != int32(internal.StateCreateJust) {
-               return errors.New("subscribe topic only started before")
+       if atomic.LoadInt32(&pc.state) == int32(internal.StateStartFailed) ||
+               atomic.LoadInt32(&pc.state) == int32(internal.StateShutdown) {
+               return errors.New("cannot subscribe topic since client either 
failed to start or has been shutdown.")
        }
+
        if pc.option.Namespace != "" {
                topic = pc.option.Namespace + "%" + topic
        }

Reply via email to