panszobe opened a new issue, #949:
URL: https://github.com/apache/pulsar-client-go/issues/949

   According to PR: #938 
   use master version(v0.9.1-0.20230117072740-d9b18d0690c1) to consume messages 
while EnableBatchIndexAcknowledgment set  
    true,but **consume performance drops to 2/3 of previous**。
   
   The test situation is as follows:
   Topic has 5 partitions, producer production rate is 20MB/s , 300000 rows/s.
   
   and consumers consume situations:
   | SDK Version  | Enable Batch Index Ack | Consumer Instances | Consume Rate |
   | ------------- | ------------- | ------------- | ------------- |
   | v0.9.1-0.20230117072740-d9b18d0690c1 | Yes  | 3 | 100000 rows/s  |
   | v0.9.1-0.20230117072740-d9b18d0690c1 | No  | 3 | 300000+ rows/s  |
   
   
    Analyze the problem by pprof,we found that 
internal.(*connection).internalSendRequest and 
pulsar.(*partitionConsumer).internalAck are much more resource intensive when 
set EnableBatchIndexAcknowledgment as true.
   
   Review the code:
   ```
   func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error 
{
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
                pc.log.WithField("state", state).Error("Failed to ack by 
closing or closed consumer")
                return errors.New("consumer state is closed")
        }
   
        if cmid, ok := toChunkedMessageID(msgID); ok {
                return pc.unAckChunksTracker.ack(cmid)
        }
   
        trackingID, ok := toTrackingMessageID(msgID)
        if !ok {
                return errors.New("failed to convert trackingMessageID")
        }
   
        ackReq := new(ackRequest)
        ackReq.doneCh = make(chan struct{})
        ackReq.ackType = individualAck
        if !trackingID.Undefined() && trackingID.ack() {
                pc.metrics.AcksCounter.Inc()
                
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
 / 1.0e9)
                ackReq.msgID = trackingID
                // send ack request to eventsCh
                pc.eventsCh <- ackReq
   
                if withResponse {
                        <-ackReq.doneCh
                }
   
                pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
        } else if pc.options.enableBatchIndexAck {
                ackReq.msgID = trackingID
                pc.eventsCh <- ackReq
        }
   
        if withResponse {
                return ackReq.err
        }
        return nil
   }
   ```
   
   Maybe problem is that partitionConsumer will send ack request to *Pulsar 
Server* by every MessageID, without waiting all msg of one batch be acked by 
ackTracker, it leads to ack requests becoming much more than BatchIndexAck 
disabled, performance drops bacause of much more processing requests. And 
backlog is lasting increasing, could not catch up with the production rate.
   
   
   So, enableBatchIndexAck should follow the previous processing method or 
there is another way.
   
   
   @BewareMyPower Thank you for developing this feature. Could you take a look 
at this problem ? Thanks a lot!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to