shibd commented on code in PR #1301:
URL: https://github.com/apache/pulsar-client-go/pull/1301#discussion_r1825572134


##########
pulsar/consumer.go:
##########
@@ -307,6 +307,14 @@ type Consumer interface {
        // AckID the consumption of a single message, identified by its 
MessageID
        AckID(MessageID) error
 
+       // AckIDList the consumption of a list of messages, identified by their 
MessageIDs
+       // Returns a map of MessageID to error, the keys are the MessageIDs 
that failed to be acknowledged
+       // NOTE: When EnableBatchIndexAcknowledgment is false, if a message ID 
represents a message in the batch,
+       // it will not be actually acknowledged by broker until all messages in 
that batch are acknowledged via
+       // the AckID or AckIDList method.
+       // However, in this case, no error will be returned for that message ID 
even if AckWithResponse is true.
+       AckIDList([]MessageID) map[MessageID]error

Review Comment:
   > I designed the map[MessageID]error as the return value type because I want 
to handle the case that partial messages in the batch are acknowledged. But 
since now, they are treated as successful result, it might not be meaningful to 
return a map now.
   
   There are two types of errors here. 
   1. If the ID is invalid, the user should correct it and retry.
   2. If the broker returns an error, the entire batch should fail, right?
   
   So, I'm +1 for this
   
   > Or should we just fail the whole ids if there is any invalid message ID?



##########
pulsar/consumer_partition.go:
##########
@@ -683,6 +688,83 @@ func (pc *partitionConsumer) AckID(msgID MessageID) error {
        return pc.ackID(msgID, false)
 }
 
+func (pc *partitionConsumer) AckIDList(msgIDs []MessageID) map[MessageID]error 
{
+       errorMap := make(map[MessageID]error)
+       if !pc.options.ackWithResponse {
+               for _, msgID := range msgIDs {
+                       if err := pc.ackID(msgID, false); err != nil {
+                               errorMap[msgID] = err
+                       }
+               }
+               return errorMap
+       }
+
+       chunkedMsgIDs := make([]*chunkMessageID, 0) // we need to remove them 
after acknowledging
+       pendingAcks := make(map[position]*bitset.BitSet)
+
+       // They might be complete after the whole for loop
+       incompleteTrackingIDs := make([]*trackingMessageID, 0)
+       for _, msgID := range msgIDs {
+               if msgID.PartitionIdx() != pc.partitionIdx {
+                       errorMap[msgID] = fmt.Errorf("inconsistent partition 
index %v (current: %v)",
+                               msgID.PartitionIdx(), pc.partitionIdx)
+               } else if msgID.BatchIdx() >= 0 && msgID.BatchSize() > 0 &&
+                       msgID.BatchIdx() >= msgID.BatchSize() {
+                       errorMap[msgID] = fmt.Errorf("invalid batch index %v 
(size: %v)", msgID.BatchIdx(), msgID.BatchSize())
+               } else {
+                       switch convertedMsgID := msgID.(type) {
+                       case *trackingMessageID:
+                               if convertedMsgID.ack() {
+                                       pendingAcks[newPosition(msgID)] = nil
+                               } else {
+                                       incompleteTrackingIDs = 
append(incompleteTrackingIDs, convertedMsgID)
+                               }
+                       case *chunkMessageID:
+                               for _, id := range 
pc.unAckChunksTracker.get(convertedMsgID) {
+                                       pendingAcks[newPosition(id)] = nil
+                               }
+                               chunkedMsgIDs = append(chunkedMsgIDs, 
convertedMsgID)
+                       case *messageID:
+                               pendingAcks[newPosition(msgID)] = nil
+                       default:
+                               errorMap[msgID] = fmt.Errorf("invalid message 
id type %T", msgID)
+                       }
+               }
+       }
+
+       if pc.options.enableBatchIndexAck {

Review Comment:
   A non-blocking comment for this PR:
   
   If the `msgId` is a batch, can we record this ID in `ackGroupTracking` even 
when `batchIndexAck` is not enabled?
   
   This way, even if a batch ID is not acknowledged, we can filter it out the 
next time the same message is received, avoiding duplicate delivery to the user.
   
   This scenario references the issue: 
https://github.com/apache/pulsar/issues/23436



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to