BewareMyPower commented on code in PR #903:
URL: https://github.com/apache/pulsar-client-go/pull/903#discussion_r1058327748


##########
pulsar/consumer_test.go:
##########
@@ -731,6 +732,161 @@ func TestConsumerAck(t *testing.T) {
        }
 }
 
+func TestConsumerNoBatchCumulativeAck(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+               // disable batching
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+
+       const N = 100
+
+       for i := 0; i < N; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               if i == N/2-1 {
+                       // cumulative acks the first half of messages
+                       consumer.AckCumulative(msg)
+               }
+       }
+
+       consumer.Close()
+
+       // Subscribe again
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // We should only receive the 2nd half of messages
+       for i := N / 2; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               consumer.Ack(msg)
+       }
+}
+
+func TestConsumerBatchCumulativeAck(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+
+       const N = 100
+
+       // send a batch
+       wg := sync.WaitGroup{}
+       for i := 0; i < N; i++ {
+               wg.Add(1)
+               producer.SendAsync(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
+                       func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                               assert.NoError(t, e)
+                               wg.Done()
+                       })
+       }
+       wg.Wait()
+
+       err = producer.Flush()
+       assert.NoError(t, err)
+
+       // send another batch
+       wg = sync.WaitGroup{}
+       for i := N; i < 2*N; i++ {
+               wg.Add(1)
+               producer.SendAsync(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i))},
+                       func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                               assert.NoError(t, e)
+                               wg.Done()
+                       })
+       }
+       wg.Wait()
+
+       for i := 0; i < 2*N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+
+               if i == N-1 {
+                       // cumulative acks the first half of messages
+                       consumer.AckCumulative(msg)
+               }

Review Comment:
   I think you can test the case `i == N` to cover the following case
   
   ```
           } else if !trackingID.tracker.hasPrevBatchAcked() {
                   // get previous batch message id
                   ackReq.msgID = trackingID.prev()
                   trackingID.tracker.setPrevBatchAcked()
   ```
   
   Not sure if there is a way to reduce duplicated code. Maybe you can create 
another consumer to call `AckCumulative` when `i == N`?
   
   ```golang
   if i == N - 1 {
       consumer.AckCumulative(msg)
   } else if i == N {
       consumer2.AckCumulative(msg)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to