wgffgw opened a new issue, #1207:
URL: https://github.com/apache/rocketmq-client-go/issues/1207
unc (dc *defaultConsumer) lockAll() {
mqMapSet := dc.buildProcessQueueTableByBrokerName()
for broker, mqs := range mqMapSet {
if len(mqs) == 0 {
continue
}
brokerResult :=
dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId,
true)
if brokerResult == nil {
continue
}
body := &lockBatchRequestBody{
ConsumerGroup: dc.consumerGroup,
ClientId: dc.client.ClientID(),
MQs: mqs,
}
lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
set := make(map[primitive.MessageQueue]bool)
for idx := range lockedMQ {
_mq := lockedMQ[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
pq.WithLock(true)
pq.UpdateLastConsumeTime()
// 这里是否要更新下锁时间,pq.UpdateLastLockTime()
}
set[_mq] = true
}
for idx := range mqs {
_mq := mqs[idx]
if !set[*_mq] {
v, exist := dc.processQueueTable.Load(_mq)
if exist {
pq := v.(*processQueue)
pq.WithLock(false)
pq.UpdateLastLockTime()
//这里都不上锁了,更新锁时间干什么?
rlog.Info("lock MessageQueue",
map[string]interface{}{
"lockOK": false,
rlog.LogKeyConsumerGroup:
dc.consumerGroup,
rlog.LogKeyMessageQueue:
_mq.String(),
})
}
}
}
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]