This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 598c979  fix: pull timeout and suspend time is not consistent with 
java (#350)
598c979 is described below

commit 598c979ebd6db2212776e8e38d7f623090360481
Author: xujianhai666 <[email protected]>
AuthorDate: Fri Jan 3 11:42:40 2020 +0800

    fix: pull timeout and suspend time is not consistent with java (#350)
    
    - modify pull timeout
    - add suspend time
    
    Closes #349
---
 consumer/push_consumer.go | 19 ++++++++++---------
 internal/client.go        |  4 ++--
 internal/request.go       |  3 ++-
 3 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 74251a1..1add8dd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -599,15 +599,16 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression 
!= "", classFilter)
 
                pullRequest := &internal.PullMessageRequestHeader{
-                       ConsumerGroup:  pc.consumerGroup,
-                       Topic:          request.mq.Topic,
-                       QueueId:        int32(request.mq.QueueId),
-                       QueueOffset:    request.nextOffset,
-                       MaxMsgNums:     pc.option.PullBatchSize,
-                       SysFlag:        sysFlag,
-                       CommitOffset:   commitOffsetValue,
-                       SubExpression:  _SubAll,
-                       ExpressionType: string(TAG), // TODO
+                       ConsumerGroup:        pc.consumerGroup,
+                       Topic:                request.mq.Topic,
+                       QueueId:              int32(request.mq.QueueId),
+                       QueueOffset:          request.nextOffset,
+                       MaxMsgNums:           pc.option.PullBatchSize,
+                       SysFlag:              sysFlag,
+                       CommitOffset:         commitOffsetValue,
+                       SubExpression:        _SubAll,
+                       ExpressionType:       string(TAG),
+                       SuspendTimeoutMillis: 20 * time.Second,
                }
                //
                //if data.ExpType == string(TAG) {
diff --git a/internal/client.go b/internal/client.go
index c41417d..fec61fd 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -51,7 +51,7 @@ const (
        _PersistOffset = 5 * time.Second
 
        // Rebalance interval
-       _RebalanceInterval = 10 * time.Second
+       _RebalanceInterval = 20 * time.Second
 )
 
 var (
@@ -544,7 +544,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, 
cmd *remote.RemotingC
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, 
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
        cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-       res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 
10*time.Second)
+       res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 
30*time.Second)
        if err != nil {
                return nil, err
        }
diff --git a/internal/request.go b/internal/request.go
index 3e477ea..5438790 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -210,10 +210,11 @@ func (request *PullMessageRequestHeader) Encode() 
map[string]string {
        maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
        maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
        maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset)
-       maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", 
request.SuspendTimeoutMillis)
+       maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", 
request.SuspendTimeoutMillis/time.Millisecond)
        maps["subscription"] = request.SubExpression
        maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
        maps["expressionType"] = request.ExpressionType
+
        return maps
 }
 

Reply via email to