shibd commented on code in PR #1301:
URL: https://github.com/apache/pulsar-client-go/pull/1301#discussion_r1827765599


##########
pulsar/consumer.go:
##########
@@ -305,8 +325,26 @@ type Consumer interface {
        Ack(Message) error
 
        // AckID the consumption of a single message, identified by its 
MessageID
+       // When `EnableBatchIndexAcknowledgment` is false, if a message ID 
represents a message in the batch,
+       // it will not be actually acknowledged by broker until all messages in 
that batch are acknowledged via
+       // `AckID` or `AckIDList`.
        AckID(MessageID) error
 
+       // AckIDList the consumption of a list of messages, identified by their 
MessageIDs
+       //
+       // This method should be used when `AckWithResponse` is true. 
Otherwise, it will be equivalent with calling
+       // `AckID` on each message ID in the list.
+       //
+       // If there are some invalid message IDs in the list, this method won't 
return an error for them, only error logs
+       // will be printed because retrying acknowledging them will not make a 
difference. Message IDs retrieved from
+       // Message objects are always valid.
+       //
+       // If the consumer has subscribed multiple topics or partitions, since 
each topic is associated with an ACK request,
+       // `AckError` will be returned if there are errors because some message 
IDs might succeed while others might fail.
+       // If you're sure that only 1 message is subscribed, you should treat 
all message IDs as failed when it returns an

Review Comment:
   > If you're sure that only 1 message is subscribed, you should treat all 
message IDs as failed when it returns an error
   
   Why is this logic needed? We could keep it simple and return AckError even 
if only one topic has an error.



##########
pulsar/consumer_multitopic.go:
##########
@@ -167,6 +167,47 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
        return mid.consumer.AckID(msgID)
 }
 
+func (c *multiTopicConsumer) AckIDList(msgIDs []MessageID) error {
+       return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) 
(acker, error) {
+               if !checkMessageIDType(msgID) {
+                       return nil, errors.New("invalid message id type %T")

Review Comment:
   ```suggestion
                        return nil, errors.New("invalid message id type %T", 
msgId)
   ```



##########
pulsar/consumer_multitopic.go:
##########
@@ -167,6 +167,47 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
        return mid.consumer.AckID(msgID)
 }
 
+func (c *multiTopicConsumer) AckIDList(msgIDs []MessageID) error {
+       return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) 
(acker, error) {
+               if !checkMessageIDType(msgID) {
+                       return nil, errors.New("invalid message id type %T")
+               }
+               if mid := toTrackingMessageID(msgID); mid != nil && 
mid.consumer != nil {
+                       return mid.consumer, nil
+               }
+               return nil, errors.New("consumer is nil")
+       })
+}
+
+func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer 
func(MessageID) (acker, error)) error {
+       consumerToMsgIDs := make(map[acker][]MessageID)
+       for _, msgID := range msgIDs {
+               if consumer, err := findConsumer(msgID); err == nil {
+                       consumerToMsgIDs[consumer] = 
append(consumerToMsgIDs[consumer], msgID)
+               } else {
+                       log.Warnf("Can not find consumer for %v", msgID)
+               }
+       }
+
+       ackError := make(map[string]*TopicAckError)
+       for consumer, ids := range consumerToMsgIDs {
+               if err := consumer.AckIDList(ids); err != nil {
+                       ackError[consumer.Topic()] = &TopicAckError{
+                               Err:    err,
+                               MsgIDs: ids,
+                       }
+               }
+       }
+       if len(ackError) == 0 {
+               return nil
+       } else if len(ackError) == 1 {
+               for _, topicAckError := range ackError {

Review Comment:
   As I commented on the API, maybe we could keep it simple and consistently 
return `AckError`.



##########
pulsar/consumer_multitopic.go:
##########
@@ -167,6 +167,47 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
        return mid.consumer.AckID(msgID)
 }
 
+func (c *multiTopicConsumer) AckIDList(msgIDs []MessageID) error {
+       return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) 
(acker, error) {
+               if !checkMessageIDType(msgID) {
+                       return nil, errors.New("invalid message id type %T")
+               }
+               if mid := toTrackingMessageID(msgID); mid != nil && 
mid.consumer != nil {
+                       return mid.consumer, nil
+               }
+               return nil, errors.New("consumer is nil")
+       })
+}
+
+func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer 
func(MessageID) (acker, error)) error {
+       consumerToMsgIDs := make(map[acker][]MessageID)
+       for _, msgID := range msgIDs {
+               if consumer, err := findConsumer(msgID); err == nil {
+                       consumerToMsgIDs[consumer] = 
append(consumerToMsgIDs[consumer], msgID)
+               } else {
+                       log.Warnf("Can not find consumer for %v", msgID)

Review Comment:
   Do we need to include this type of error in `AckError` as well?



##########
pulsar/consumer_multitopic.go:
##########
@@ -167,6 +167,47 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) error {
        return mid.consumer.AckID(msgID)
 }
 
+func (c *multiTopicConsumer) AckIDList(msgIDs []MessageID) error {
+       return ackIDListFromMultiTopics(c.log, msgIDs, func(msgID MessageID) 
(acker, error) {
+               if !checkMessageIDType(msgID) {
+                       return nil, errors.New("invalid message id type %T")
+               }
+               if mid := toTrackingMessageID(msgID); mid != nil && 
mid.consumer != nil {
+                       return mid.consumer, nil
+               }
+               return nil, errors.New("consumer is nil")
+       })
+}
+
+func ackIDListFromMultiTopics(log log.Logger, msgIDs []MessageID, findConsumer 
func(MessageID) (acker, error)) error {
+       consumerToMsgIDs := make(map[acker][]MessageID)
+       for _, msgID := range msgIDs {
+               if consumer, err := findConsumer(msgID); err == nil {
+                       consumerToMsgIDs[consumer] = 
append(consumerToMsgIDs[consumer], msgID)
+               } else {
+                       log.Warnf("Can not find consumer for %v", msgID)

Review Comment:
   
https://github.com/apache/pulsar-client-go/blob/6ffc7d283fbdb43e11d16bd643a7f28544cd9d1c/pulsar/consumer_impl.go#L564-L571
   
   For example, if checkMsgIDPartition return an error, we need to tell the 
user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to