This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new bd8861c fix bug:seek offset won't work due to wrong map key type
(#1184)
bd8861c is described below
commit bd8861cb04b456693af7dff9c55c8c5451cad12b
Author: muyun.cyt <[email protected]>
AuthorDate: Fri Dec 20 11:04:31 2024 +0800
fix bug:seek offset won't work due to wrong map key type (#1184)
Co-authored-by: muyun.cyt <[email protected]>
---
consumer/pull_consumer.go | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 3258779..9592061 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -208,12 +208,17 @@ func (pc *defaultPullConsumer) nextPullOffset(mq
*primitive.MessageQueue, origin
if pc.SubType != Assign {
return originOffset
}
- value, exist := pc.mq2seekOffset.LoadAndDelete(mq)
+ value, exist := pc.mq2seekOffset.LoadAndDelete(*mq)
if !exist {
return originOffset
} else {
nextOffset := value.(int64)
_ = pc.updateOffset(mq, nextOffset)
+ rlog.Info("pull consumer assign new offset",
map[string]interface{}{
+ "group": pc.GroupName,
+ "mq": mq,
+ "offset": nextOffset,
+ })
return nextOffset
}
}
@@ -711,7 +716,7 @@ func (pc *defaultPullConsumer) ResetOffset(topic string,
table map[primitive.Mes
}
func (pc *defaultPullConsumer) SeekOffset(mq *primitive.MessageQueue, offset
int64) {
- pc.mq2seekOffset.Store(mq, offset)
+ pc.mq2seekOffset.Store(*mq, offset)
rlog.Info("pull consumer seek offset", map[string]interface{}{
"mq": mq,
"offset": offset,
@@ -881,6 +886,8 @@ func (pc *defaultPullConsumer) pullMessage(request
*PullRequest) {
pullRequest.SysFlag =
clearCommitOffsetFlag(pullRequest.SysFlag)
}
+ rlog.Debug(fmt.Sprintf("defaultPullConsumer pull message from
broker: %s, request: %+v", brokerResult.BrokerAddr, pullRequest), nil)
+
result, err := pc.client.PullMessage(context.Background(),
brokerResult.BrokerAddr, pullRequest)
if err != nil {
rlog.Warning("defaultPullConsumer pull message from
broker error", map[string]interface{}{