panszobe opened a new issue, #949:
URL: https://github.com/apache/pulsar-client-go/issues/949
According to PR: #938
use master version(v0.9.1-0.20230117072740-d9b18d0690c1) to consume messages
while EnableBatchIndexAcknowledgment set
true,but **consume performance drops to 2/3 of previous**。
The test situation is as follows:
Topic has 5 partitions, producer production rate is 20MB/s , 300000 rows/s.
and consumers consume situations:
| SDK Version | Enable Batch Index Ack | Consumer Instances | Consume Rate |
| ------------- | ------------- | ------------- | ------------- |
| v0.9.1-0.20230117072740-d9b18d0690c1 | Yes | 3 | 100000 rows/s |
| v0.9.1-0.20230117072740-d9b18d0690c1 | No | 3 | 300000+ rows/s |
Analyze the problem by pprof,we found that
internal.(*connection).internalSendRequest and
pulsar.(*partitionConsumer).internalAck are much more resource intensive when
set EnableBatchIndexAcknowledgment as true.
Review the code:
```
func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error
{
if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
return errors.New("consumer state is closed")
}
if cmid, ok := toChunkedMessageID(msgID); ok {
return pc.unAckChunksTracker.ack(cmid)
}
trackingID, ok := toTrackingMessageID(msgID)
if !ok {
return errors.New("failed to convert trackingMessageID")
}
ackReq := new(ackRequest)
ackReq.doneCh = make(chan struct{})
ackReq.ackType = individualAck
if !trackingID.Undefined() && trackingID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
/ 1.0e9)
ackReq.msgID = trackingID
// send ack request to eventsCh
pc.eventsCh <- ackReq
if withResponse {
<-ackReq.doneCh
}
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
} else if pc.options.enableBatchIndexAck {
ackReq.msgID = trackingID
pc.eventsCh <- ackReq
}
if withResponse {
return ackReq.err
}
return nil
}
```
Maybe problem is that partitionConsumer will send ack request to *Pulsar
Server* by every MessageID, without waiting all msg of one batch be acked by
ackTracker, it leads to ack requests becoming much more than BatchIndexAck
disabled, performance drops bacause of much more processing requests. And
backlog is lasting increasing, could not catch up with the production rate.
So, enableBatchIndexAck should follow the previous processing method or
there is another way.
@BewareMyPower Thank you for developing this feature. Could you take a look
at this problem ? Thanks a lot!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]