This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 598c979 fix: pull timeout and suspend time is not consistent with
java (#350)
598c979 is described below
commit 598c979ebd6db2212776e8e38d7f623090360481
Author: xujianhai666 <[email protected]>
AuthorDate: Fri Jan 3 11:42:40 2020 +0800
fix: pull timeout and suspend time is not consistent with java (#350)
- modify pull timeout
- add suspend time
Closes #349
---
consumer/push_consumer.go | 19 ++++++++++---------
internal/client.go | 4 ++--
internal/request.go | 3 ++-
3 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 74251a1..1add8dd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -599,15 +599,16 @@ func (pc *pushConsumer) pullMessage(request *PullRequest)
{
sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression
!= "", classFilter)
pullRequest := &internal.PullMessageRequestHeader{
- ConsumerGroup: pc.consumerGroup,
- Topic: request.mq.Topic,
- QueueId: int32(request.mq.QueueId),
- QueueOffset: request.nextOffset,
- MaxMsgNums: pc.option.PullBatchSize,
- SysFlag: sysFlag,
- CommitOffset: commitOffsetValue,
- SubExpression: _SubAll,
- ExpressionType: string(TAG), // TODO
+ ConsumerGroup: pc.consumerGroup,
+ Topic: request.mq.Topic,
+ QueueId: int32(request.mq.QueueId),
+ QueueOffset: request.nextOffset,
+ MaxMsgNums: pc.option.PullBatchSize,
+ SysFlag: sysFlag,
+ CommitOffset: commitOffsetValue,
+ SubExpression: _SubAll,
+ ExpressionType: string(TAG),
+ SuspendTimeoutMillis: 20 * time.Second,
}
//
//if data.ExpType == string(TAG) {
diff --git a/internal/client.go b/internal/client.go
index c41417d..fec61fd 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -51,7 +51,7 @@ const (
_PersistOffset = 5 * time.Second
// Rebalance interval
- _RebalanceInterval = 10 * time.Second
+ _RebalanceInterval = 20 * time.Second
)
var (
@@ -544,7 +544,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string,
cmd *remote.RemotingC
// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string,
request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
- res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd,
10*time.Second)
+ res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd,
30*time.Second)
if err != nil {
return nil, err
}
diff --git a/internal/request.go b/internal/request.go
index 3e477ea..5438790 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -210,10 +210,11 @@ func (request *PullMessageRequestHeader) Encode()
map[string]string {
maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset)
- maps["suspendTimeoutMillis"] = fmt.Sprintf("%d",
request.SuspendTimeoutMillis)
+ maps["suspendTimeoutMillis"] = fmt.Sprintf("%d",
request.SuspendTimeoutMillis/time.Millisecond)
maps["subscription"] = request.SubExpression
maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
maps["expressionType"] = request.ExpressionType
+
return maps
}