This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bca88d  fix: unlock all queues when consumer shutdown in orderly model
     new 3b8b238  Merge pull request #1015 from cserwen/orderly_consume_github
9bca88d is described below

commit 9bca88dcb44a0f6a4e21639bb269271eafafbe64
Author: dengzhiwen1 <[email protected]>
AuthorDate: Fri Mar 10 14:03:58 2023 +0800

    fix: unlock all queues when consumer shutdown in orderly model
---
 consumer/consumer.go      | 12 ++++++------
 consumer/push_consumer.go |  4 +++-
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 1511f01..7bd2b02 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -631,12 +631,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body 
*lockBatchRequestBody, one
                }
        } else {
                response, err := dc.client.InvokeSync(context.Background(), 
addr, request, 1*time.Second)
-               rlog.Error("lock MessageQueue to broker invoke error", 
map[string]interface{}{
-                       rlog.LogKeyBroker:        addr,
-                       rlog.LogKeyUnderlayError: err,
-               })
-               if response.Code != internal.ResSuccess {
-                       // TODO error
+               if err != nil || response == nil || response.Code != 
internal.ResSuccess {
+                       rlog.Error("lock MessageQueue to broker invoke error", 
map[string]interface{}{
+                               rlog.LogKeyBroker:        addr,
+                               rlog.LogKeyUnderlayError: err,
+                               "response":               response,
+                       })
                }
        }
 }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 7d585de..85f9725 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -277,7 +277,9 @@ func (pc *pushConsumer) Shutdown() error {
                        pc.option.TraceDispatcher.Close()
                }
                close(pc.done)
-
+               if pc.consumeOrderly && pc.model == Clustering {
+                       pc.unlockAll(false)
+               }
                pc.client.UnregisterConsumer(pc.consumerGroup)
                err = pc.defaultConsumer.shutdown()
        })

Reply via email to