This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d257b0  Support grouping ACK requests by time and size (#957)
7d257b0 is described below

commit 7d257b01b1216e0009495a4aaf83b1716ba458a9
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Feb 22 08:38:22 2023 +0800

    Support grouping ACK requests by time and size (#957)
    
    * Support grouping ACK requests by time and size
    
    Fixes #949
    
    ### Motivation
    
    Currently the Go client does not support grouping ACK requests, so each
    time `Ack` (or similar APIs) is called, a ACK request will be sent,
    which could downgrade the performance. We need to support configuring
    the time and size to cache `MessageID` before sending ACK requests.
    
    ### Modifications
    - Add an `AckGroupingOptions` field to `ConsumerOptions`, when it's nil,
      use 100ms as the max time and 1000 as the max size.
    - Add an `ackGroupingTracker` interface to support grouping ACK requests
    - When `AckWithResponse` is false, adding the `MessageID` instance to
      the tracker instead of sending the requests to `eventsCh`.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
      - Added `ack_grouping_tracker_test.go` to verify `ackGroupingTracker`
        in various cases
      - The consumer side change can be covered by existing tests because
        the default `AckGroupingOptions` config is
        `{ MaxSize: 1000, MaxTime: 100*time.Millisecond }`.
    
    * Fix flushAndClean race
    
    * Use unbuffered channel for flush operations
    
    * Apply different AckGroupingOptions and expose this config
---
 pulsar/ack_grouping_tracker.go      | 298 ++++++++++++++++++++++++++++++++++++
 pulsar/ack_grouping_tracker_test.go | 197 ++++++++++++++++++++++++
 pulsar/consumer.go                  |  21 +++
 pulsar/consumer_impl.go             |   1 +
 pulsar/consumer_partition.go        |  83 ++++++----
 pulsar/consumer_partition_test.go   |   6 +
 pulsar/consumer_test.go             |  54 +++----
 pulsar/message.go                   |  39 +++++
 8 files changed, 647 insertions(+), 52 deletions(-)

diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
new file mode 100644
index 0000000..dbc70f5
--- /dev/null
+++ b/pulsar/ack_grouping_tracker.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "time"
+
+       "github.com/bits-and-blooms/bitset"
+)
+
+type ackGroupingTracker interface {
+       add(id MessageID)
+
+       addCumulative(id MessageID)
+
+       isDuplicate(id MessageID) bool
+
+       flush()
+
+       flushAndClean()
+
+       close()
+}
+
+type ackFlushType int
+
+const (
+       flushOnly ackFlushType = iota
+       flushAndClean
+       flushAndClose
+)
+
+func newAckGroupingTracker(options *AckGroupingOptions,
+       ackIndividual func(id MessageID),
+       ackCumulative func(id MessageID)) ackGroupingTracker {
+       if options == nil {
+               options = &AckGroupingOptions{
+                       MaxSize: 1000,
+                       MaxTime: 100 * time.Millisecond,
+               }
+       }
+
+       if options.MaxSize <= 1 {
+               return &immediateAckGroupingTracker{
+                       ackIndividual: ackIndividual,
+                       ackCumulative: ackCumulative,
+               }
+       }
+
+       c := &cachedAcks{
+               singleAcks:        make([]MessageID, options.MaxSize),
+               pendingAcks:       make(map[int64]*bitset.BitSet),
+               lastCumulativeAck: EarliestMessageID(),
+               ackIndividual:     ackIndividual,
+               ackCumulative:     ackCumulative,
+               ackList: func(ids []MessageID) {
+                       // TODO: support ack a list of MessageIDs
+                       for _, id := range ids {
+                               ackIndividual(id)
+                       }
+               },
+       }
+
+       timeout := time.NewTicker(time.Hour)
+       if options.MaxTime > 0 {
+               timeout = time.NewTicker(options.MaxTime)
+       } else {
+               timeout.Stop()
+       }
+       t := &timedAckGroupingTracker{
+               ackIndividualCh:   make(chan MessageID),
+               ackCumulativeCh:   make(chan MessageID),
+               duplicateIDCh:     make(chan MessageID),
+               duplicateResultCh: make(chan bool),
+               flushCh:           make(chan ackFlushType),
+               waitFlushCh:       make(chan bool),
+       }
+       go func() {
+               for {
+                       select {
+                       case id := <-t.ackIndividualCh:
+                               if c.addAndCheckIfFull(id) {
+                                       c.flushIndividualAcks()
+                                       if options.MaxTime > 0 {
+                                               timeout.Reset(options.MaxTime)
+                                       }
+                               }
+                       case id := <-t.ackCumulativeCh:
+                               c.tryUpdateLastCumulativeAck(id)
+                               if options.MaxTime <= 0 {
+                                       c.flushCumulativeAck()
+                               }
+                       case id := <-t.duplicateIDCh:
+                               t.duplicateResultCh <- c.isDuplicate(id)
+                       case <-timeout.C:
+                               c.flush()
+                       case ackFlushType := <-t.flushCh:
+                               timeout.Stop()
+                               c.flush()
+                               if ackFlushType == flushAndClean {
+                                       c.clean()
+                               }
+                               t.waitFlushCh <- true
+                               if ackFlushType == flushAndClose {
+                                       return
+                               }
+                       }
+               }
+       }()
+       return t
+}
+
+type immediateAckGroupingTracker struct {
+       ackIndividual func(id MessageID)
+       ackCumulative func(id MessageID)
+}
+
+func (i *immediateAckGroupingTracker) add(id MessageID) {
+       i.ackIndividual(id)
+}
+
+func (i *immediateAckGroupingTracker) addCumulative(id MessageID) {
+       i.ackCumulative(id)
+}
+
+func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool {
+       return false
+}
+
+func (i *immediateAckGroupingTracker) flush() {
+}
+
+func (i *immediateAckGroupingTracker) flushAndClean() {
+}
+
+func (i *immediateAckGroupingTracker) close() {
+}
+
+type cachedAcks struct {
+       singleAcks []MessageID
+       index      int
+
+       // Key is the hash code of the ledger id and the netry id,
+       // Value is the bit set that represents which messages are acknowledged 
if the entry stores a batch.
+       // The bit 1 represents the message has been acknowledged, i.e. the 
bits "111" represents all messages
+       // in the batch whose batch size is 3 are not acknowledged.
+       // After the 1st message (i.e. batch index is 0) is acknowledged, the 
bits will become "011".
+       // Value is nil if the entry represents a single message.
+       pendingAcks map[int64]*bitset.BitSet
+
+       lastCumulativeAck     MessageID
+       cumulativeAckRequired bool
+
+       ackIndividual func(id MessageID)
+       ackCumulative func(id MessageID)
+       ackList       func(ids []MessageID)
+}
+
+func (t *cachedAcks) addAndCheckIfFull(id MessageID) bool {
+       t.singleAcks[t.index] = id
+       t.index++
+       key := messageIDHash(id)
+       ackSet, found := t.pendingAcks[key]
+       if !found {
+               if messageIDIsBatch(id) {
+                       ackSet = bitset.New(uint(id.BatchSize()))
+                       for i := 0; i < int(id.BatchSize()); i++ {
+                               ackSet.Set(uint(i))
+                       }
+                       t.pendingAcks[key] = ackSet
+               } else {
+                       t.pendingAcks[key] = nil
+               }
+       }
+       if ackSet != nil {
+               ackSet.Clear(uint(id.BatchIdx()))
+       }
+       return t.index == len(t.singleAcks)
+}
+
+func (t *cachedAcks) tryUpdateLastCumulativeAck(id MessageID) {
+       if messageIDCompare(t.lastCumulativeAck, id) < 0 {
+               t.lastCumulativeAck = id
+               t.cumulativeAckRequired = true
+       }
+}
+
+func (t *cachedAcks) isDuplicate(id MessageID) bool {
+       if messageIDCompare(t.lastCumulativeAck, id) >= 0 {
+               return true
+       }
+       ackSet, found := t.pendingAcks[messageIDHash(id)]
+       if !found {
+               return false
+       }
+       if ackSet == nil || !messageIDIsBatch(id) {
+               // NOTE: should we panic when ackSet != nil and 
messageIDIsBatch(id) is true?
+               return true
+       }
+       // 0 represents the message has been acknowledged
+       return !ackSet.Test(uint(id.BatchIdx()))
+}
+
+func (t *cachedAcks) flushIndividualAcks() {
+       if t.index > 0 {
+               t.ackList(t.singleAcks[0:t.index])
+               for _, id := range t.singleAcks[0:t.index] {
+                       key := messageIDHash(id)
+                       ackSet, found := t.pendingAcks[key]
+                       if !found {
+                               continue
+                       }
+                       if ackSet == nil {
+                               delete(t.pendingAcks, key)
+                       } else {
+                               ackSet.Clear(uint(id.BatchIdx()))
+                               if ackSet.None() { // all messages have been 
acknowledged
+                                       delete(t.pendingAcks, key)
+                               }
+                       }
+                       delete(t.pendingAcks, messageIDHash(id))
+               }
+               t.index = 0
+       }
+}
+
+func (t *cachedAcks) flushCumulativeAck() {
+       if t.cumulativeAckRequired {
+               t.ackCumulative(t.lastCumulativeAck)
+               t.cumulativeAckRequired = false
+       }
+}
+
+func (t *cachedAcks) flush() {
+       t.flushIndividualAcks()
+       t.flushCumulativeAck()
+}
+
+func (t *cachedAcks) clean() {
+       maxSize := len(t.singleAcks)
+       t.singleAcks = make([]MessageID, maxSize)
+       t.index = 0
+       t.pendingAcks = make(map[int64]*bitset.BitSet)
+       t.lastCumulativeAck = EarliestMessageID()
+       t.cumulativeAckRequired = false
+}
+
+type timedAckGroupingTracker struct {
+       ackIndividualCh   chan MessageID
+       ackCumulativeCh   chan MessageID
+       duplicateIDCh     chan MessageID
+       duplicateResultCh chan bool
+       flushCh           chan ackFlushType
+       waitFlushCh       chan bool
+}
+
+func (t *timedAckGroupingTracker) add(id MessageID) {
+       t.ackIndividualCh <- id
+}
+
+func (t *timedAckGroupingTracker) addCumulative(id MessageID) {
+       t.ackCumulativeCh <- id
+}
+
+func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool {
+       t.duplicateIDCh <- id
+       return <-t.duplicateResultCh
+}
+
+func (t *timedAckGroupingTracker) flush() {
+       t.flushCh <- flushOnly
+       <-t.waitFlushCh
+}
+
+func (t *timedAckGroupingTracker) flushAndClean() {
+       t.flushCh <- flushAndClean
+       <-t.waitFlushCh
+}
+
+func (t *timedAckGroupingTracker) close() {
+       t.flushCh <- flushAndClose
+       <-t.waitFlushCh
+}
diff --git a/pulsar/ack_grouping_tracker_test.go 
b/pulsar/ack_grouping_tracker_test.go
new file mode 100644
index 0000000..d7903e8
--- /dev/null
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestNoCacheTracker(t *testing.T) {
+       tests := []AckGroupingOptions{
+               {
+                       MaxSize: 0,
+                       MaxTime: 10 * time.Hour,
+               },
+               {
+                       MaxSize: 1,
+                       MaxTime: 10 * time.Hour,
+               },
+       }
+       for _, option := range tests {
+               t.Run(fmt.Sprintf("TestAckImmediately_size_%v_time_%vs", 
option.MaxSize, option.MaxTime.Seconds()),
+                       func(t *testing.T) {
+                               ledgerID0 := int64(-1)
+                               ledgerID1 := int64(-1)
+                               tracker := newAckGroupingTracker(&option,
+                                       func(id MessageID) { ledgerID0 = 
id.LedgerID() },
+                                       func(id MessageID) { ledgerID1 = 
id.LedgerID() })
+
+                               tracker.add(&messageID{ledgerID: 1})
+                               assert.Equal(t, atomic.LoadInt64(&ledgerID0), 
int64(1))
+                               tracker.addCumulative(&messageID{ledgerID: 2})
+                               assert.Equal(t, atomic.LoadInt64(&ledgerID1), 
int64(2))
+                       })
+       }
+}
+
+type mockAcker struct {
+       sync.Mutex
+       ledgerIDs          []int64
+       cumulativeLedgerID int64
+}
+
+func (a *mockAcker) ack(id MessageID) {
+       defer a.Unlock()
+       a.Lock()
+       a.ledgerIDs = append(a.ledgerIDs, id.LedgerID())
+}
+
+func (a *mockAcker) ackCumulative(id MessageID) {
+       atomic.StoreInt64(&a.cumulativeLedgerID, id.LedgerID())
+}
+
+func (a *mockAcker) getLedgerIDs() []int64 {
+       defer a.Unlock()
+       a.Lock()
+       return a.ledgerIDs
+}
+
+func (a *mockAcker) getCumulativeLedgerID() int64 {
+       return atomic.LoadInt64(&a.cumulativeLedgerID)
+}
+
+func (a *mockAcker) reset() {
+       a.ledgerIDs = make([]int64, 0)
+       a.cumulativeLedgerID = int64(0)
+}
+
+func TestCachedTracker(t *testing.T) {
+       var acker mockAcker
+       tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3, 
MaxTime: 0},
+               func(id MessageID) { acker.ack(id) }, func(id MessageID) { 
acker.ackCumulative(id) })
+
+       tracker.add(&messageID{ledgerID: 1})
+       tracker.add(&messageID{ledgerID: 2})
+       for i := 1; i <= 2; i++ {
+               assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 
int64(i)}))
+       }
+       assert.Equal(t, 0, len(acker.getLedgerIDs()))
+       tracker.add(&messageID{ledgerID: 3})
+       assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 0 
},
+               10*time.Millisecond, 2*time.Millisecond)
+       assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
+       for i := 1; i <= 3; i++ {
+               assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 
int64(i)}))
+       }
+
+       tracker.add(&messageID{ledgerID: 4})
+       // 4 won't be added because the cache is not full
+       assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
+
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 5}))
+       tracker.addCumulative(&messageID{ledgerID: 5})
+       for i := 0; i <= 5; i++ {
+               assert.True(t, tracker.isDuplicate(&messageID{ledgerID: 
int64(i)}))
+       }
+       assert.Equal(t, int64(5), acker.getCumulativeLedgerID())
+       assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(6)}))
+
+       tracker.flush()
+       assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 3 
},
+               10*time.Millisecond, 2*time.Millisecond)
+       assert.Equal(t, []int64{1, 2, 3, 4}, acker.getLedgerIDs())
+}
+
+func TestTimedTrackerIndividualAck(t *testing.T) {
+       var acker mockAcker
+       // MaxSize: 1000, MaxTime: 100ms
+       tracker := newAckGroupingTracker(nil, func(id MessageID) { 
acker.ack(id) }, nil)
+
+       expected := make([]int64, 0)
+       for i := 0; i < 999; i++ {
+               tracker.add(&messageID{ledgerID: int64(i)})
+               expected = append(expected, int64(i))
+       }
+       assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+       // case 1: flush because the tracker timed out
+       assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) == 
999 },
+               150*time.Millisecond, 10*time.Millisecond)
+       assert.Equal(t, expected, acker.getLedgerIDs())
+
+       // case 2: flush because cache is full
+       time.Sleep(50) // see case 3
+       acker.reset()
+       expected = append(expected, 999)
+       for i := 0; i < 1001; i++ {
+               tracker.add(&messageID{ledgerID: int64(i)})
+       }
+       assert.Equal(t, expected, acker.getLedgerIDs())
+
+       // case 3: flush will reset the timer
+       start := time.Now()
+       assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 
1000 },
+               150*time.Millisecond, 10*time.Millisecond)
+       elapsed := time.Since(start)
+       assert.GreaterOrEqual(t, elapsed, int64(100), "elapsed", elapsed)
+       assert.Equal(t, append(expected, 1000), acker.getLedgerIDs())
+}
+
+func TestTimedTrackerCumulativeAck(t *testing.T) {
+       var acker mockAcker
+       // MaxTime is 100ms
+       tracker := newAckGroupingTracker(nil, nil, func(id MessageID) { 
acker.ackCumulative(id) })
+
+       // case 1: flush because of the timeout
+       tracker.addCumulative(&messageID{ledgerID: 1})
+       assert.NotEqual(t, int64(1), acker.getCumulativeLedgerID())
+       assert.Eventually(t, func() bool { return acker.getCumulativeLedgerID() 
== int64(1) },
+               150*time.Millisecond, 10*time.Millisecond)
+       assert.Equal(t, int64(1), acker.getCumulativeLedgerID())
+
+       // case 2: flush manually
+       tracker.addCumulative(&messageID{ledgerID: 2})
+       tracker.flush()
+       assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
+
+       // case 3: older MessageID cannot be acknowledged
+       tracker.addCumulative(&messageID{ledgerID: 1})
+       tracker.flush()
+       assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
+}
+
+func TestTimedTrackerIsDuplicate(t *testing.T) {
+       tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id 
MessageID) {})
+
+       tracker.add(messageID{batchIdx: 0, batchSize: 3})
+       tracker.add(messageID{batchIdx: 2, batchSize: 3})
+       assert.True(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 
3}))
+       assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 
3}))
+       assert.True(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 
3}))
+
+       tracker.flush()
+       assert.False(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize: 
3}))
+       assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize: 
3}))
+       assert.False(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize: 
3}))
+}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 8bae57d..9576d7a 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -80,6 +80,20 @@ type DLQPolicy struct {
        RetryLetterTopic string
 }
 
+// AckGroupingOptions controls how to group ACK requests
+// If maxSize is 0 or 1, any ACK request will be sent immediately.
+// Otherwise, the ACK requests will be cached until one of the following 
conditions meets:
+// 1. There are `MaxSize` pending ACK requests.
+// 2. `MaxTime` is greater than 1 microsecond and ACK requests have been 
cached for `maxTime`.
+// Specially, for cumulative acknowledgment, only the latest ACK is cached and 
it will only be sent after `MaxTime`.
+type AckGroupingOptions struct {
+       // The maximum number of ACK requests to cache
+       MaxSize uint32
+
+       // The maximum time to cache ACK requests
+       MaxTime time.Duration
+}
+
 // ConsumerOptions is used to configure and create instances of Consumer.
 type ConsumerOptions struct {
        // Topic specifies the topic this consumer will subscribe on.
@@ -215,6 +229,13 @@ type ConsumerOptions struct {
        // Enable or disable batch index acknowledgment. To enable this 
feature, ensure batch index acknowledgment
        // is enabled on the broker side. (default: false)
        EnableBatchIndexAcknowledgment bool
+
+       // Controls how to group ACK requests, the default value is nil, which 
means:
+       // MaxSize: 1000
+       // MaxTime: 100*time.Millisecond
+       // NOTE: This option does not work if AckWithResponse is true
+       //      because there are only synchronous APIs for acknowledgment
+       AckGroupingOptions *AckGroupingOptions
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index bf136c8..8ee1822 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -398,6 +398,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                autoAckIncompleteChunk:      
c.options.AutoAckIncompleteChunk,
                                consumerEventListener:       
c.options.EventListener,
                                enableBatchIndexAck:         
c.options.EnableBatchIndexAcknowledgment,
+                               ackGroupingOptions:          
c.options.AckGroupingOptions,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e723f8a..95a8d32 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -116,6 +116,7 @@ type partitionConsumerOpts struct {
        // in failover mode, this callback will be called when consumer change
        consumerEventListener ConsumerEventListener
        enableBatchIndexAck   bool
+       ackGroupingOptions    *AckGroupingOptions
 }
 
 type ConsumerEventListener interface {
@@ -167,6 +168,7 @@ type partitionConsumer struct {
 
        chunkedMsgCtxMap   *chunkedMsgCtxMap
        unAckChunksTracker *unAckChunksTracker
+       ackGroupingTracker ackGroupingTracker
 }
 
 func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -310,6 +312,9 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        pc.availablePermits = &availablePermits{pc: pc}
        pc.chunkedMsgCtxMap = 
newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
        pc.unAckChunksTracker = newUnAckChunksTracker(pc)
+       pc.ackGroupingTracker = 
newAckGroupingTracker(options.ackGroupingOptions,
+               func(id MessageID) { pc.sendIndividualAck(id) },
+               func(id MessageID) { pc.sendCumulativeAck(id) })
        pc.setConsumerState(consumerInit)
        pc.log = client.log.SubLogger(log.Fields{
                "name":         pc.name,
@@ -467,30 +472,35 @@ func (pc *partitionConsumer) ackID(msgID MessageID, 
withResponse bool) error {
                return errors.New("failed to convert trackingMessageID")
        }
 
-       ackReq := new(ackRequest)
-       ackReq.doneCh = make(chan struct{})
-       ackReq.ackType = individualAck
        if !trackingID.Undefined() && trackingID.ack() {
                pc.metrics.AcksCounter.Inc()
                
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
 / 1.0e9)
-               ackReq.msgID = trackingID
-               // send ack request to eventsCh
-               pc.eventsCh <- ackReq
-
-               if withResponse {
-                       <-ackReq.doneCh
-               }
-
-               pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
-       } else if pc.options.enableBatchIndexAck {
-               ackReq.msgID = trackingID
-               pc.eventsCh <- ackReq
+       } else if !pc.options.enableBatchIndexAck {
+               return nil
        }
 
+       var ackReq *ackRequest
        if withResponse {
-               return ackReq.err
+               ackReq := pc.sendIndividualAck(&trackingID)
+               <-ackReq.doneCh
+       } else {
+               pc.ackGroupingTracker.add(&trackingID)
        }
-       return nil
+       pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
+       if ackReq == nil {
+               return nil
+       }
+       return ackReq.err
+}
+
+func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
+       ackReq := &ackRequest{
+               doneCh:  make(chan struct{}),
+               ackType: individualAck,
+               msgID:   *msgID.(*trackingMessageID),
+       }
+       pc.eventsCh <- ackReq
+       return ackReq
 }
 
 func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
@@ -524,14 +534,12 @@ func (pc *partitionConsumer) 
internalAckIDCumulative(msgID MessageID, withRespon
                return nil
        }
 
-       ackReq := new(ackRequest)
-       ackReq.doneCh = make(chan struct{})
-       ackReq.ackType = cumulativeAck
+       var msgIDToAck trackingMessageID
        if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
-               ackReq.msgID = trackingID
+               msgIDToAck = trackingID
        } else if !trackingID.tracker.hasPrevBatchAcked() {
                // get previous batch message id
-               ackReq.msgID = trackingID.prev()
+               msgIDToAck = trackingID.prev()
                trackingID.tracker.setPrevBatchAcked()
        } else {
                // waiting for all the msgs are acked in this batch
@@ -540,12 +548,13 @@ func (pc *partitionConsumer) 
internalAckIDCumulative(msgID MessageID, withRespon
 
        pc.metrics.AcksCounter.Inc()
        
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
 / 1.0e9)
-       // send ack request to eventsCh
-       pc.eventsCh <- ackReq
 
+       var ackReq *ackRequest
        if withResponse {
-               // wait for the request to complete if withResponse set true
+               ackReq := pc.sendCumulativeAck(&msgIDToAck)
                <-ackReq.doneCh
+       } else {
+               pc.ackGroupingTracker.addCumulative(&msgIDToAck)
        }
 
        pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
@@ -554,7 +563,20 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID 
MessageID, withRespon
                pc.unAckChunksTracker.remove(cmid)
        }
 
-       return nil
+       if ackReq == nil {
+               return nil
+       }
+       return ackReq.err
+}
+
+func (pc *partitionConsumer) sendCumulativeAck(msgID MessageID) *ackRequest {
+       ackReq := &ackRequest{
+               doneCh:  make(chan struct{}),
+               ackType: cumulativeAck,
+               msgID:   *msgID.(*trackingMessageID),
+       }
+       pc.eventsCh <- ackReq
+       return ackReq
 }
 
 func (pc *partitionConsumer) NackID(msgID MessageID) {
@@ -631,6 +653,9 @@ func (pc *partitionConsumer) Close() {
                return
        }
 
+       // flush all pending ACK requests and terminate the timer goroutine
+       pc.ackGroupingTracker.close()
+
        // close chunkedMsgCtxMap
        pc.chunkedMsgCtxMap.Close()
 
@@ -658,6 +683,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {
                return errors.New("unhandled messageID type")
        }
 
+       pc.ackGroupingTracker.flushAndClean()
        pc.eventsCh <- req
 
        // wait for the request to complete
@@ -715,6 +741,7 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) 
error {
                doneCh:      make(chan struct{}),
                publishTime: time,
        }
+       pc.ackGroupingTracker.flushAndClean()
        pc.eventsCh <- req
 
        // wait for the request to complete
@@ -957,6 +984,10 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        msgID = trackingMsgID
                }
 
+               if pc.ackGroupingTracker.isDuplicate(msgID) {
+                       continue
+               }
+
                var messageIndex *uint64
                var brokerPublishTime *time.Time
                if brokerMetadata != nil {
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index fd50b70..b9a9a02 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -37,6 +37,8 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -73,6 +75,8 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -105,6 +109,8 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                metrics:              newTestMetrics(),
                decryptor:            crypto.NewNoopDecryptor(),
        }
+       pc.ackGroupingTracker = 
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+               func(id MessageID) { pc.sendIndividualAck(id) }, nil)
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
        if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0eb7aae..de90c0e 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3853,37 +3853,38 @@ func TestAckWithMessageID(t *testing.T) {
 }
 
 func TestBatchIndexAck(t *testing.T) {
-       tests := []struct {
-               AckWithResponse bool
-               Cumulative      bool
-       }{
-               {
-                       AckWithResponse: true,
-                       Cumulative:      true,
-               },
-               {
-                       AckWithResponse: true,
-                       Cumulative:      false,
-               },
-               {
-                       AckWithResponse: false,
-                       Cumulative:      true,
-               },
-               {
-                       AckWithResponse: false,
-                       Cumulative:      false,
-               },
-       }
-       for _, params := range tests {
-               
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v",
-                       params.AckWithResponse, params.Cumulative),
+       type config struct {
+               ackWithResponse    bool
+               cumulative         bool
+               ackGroupingOptions *AckGroupingOptions
+       }
+       configs := make([]config, 0)
+       for _, option := range []*AckGroupingOptions{
+               nil, // MaxSize: 1000, MaxTime: 10ms
+               {MaxSize: 0, MaxTime: 0},
+               {MaxSize: 1000, MaxTime: 0},
+       } {
+               configs = append(configs, config{true, true, option})
+               configs = append(configs, config{true, false, option})
+               configs = append(configs, config{false, true, option})
+               configs = append(configs, config{false, false, option})
+       }
+
+       for _, params := range configs {
+               option := params.ackGroupingOptions
+               if option == nil {
+                       option = &AckGroupingOptions{1000, 10 * 
time.Millisecond}
+               }
+
+               
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v_AckGroupingOption_%v_%v",
+                       params.ackWithResponse, params.cumulative, 
option.MaxSize, option.MaxTime.Milliseconds()),
                        func(t *testing.T) {
-                               runBatchIndexAckTest(t, params.AckWithResponse, 
params.Cumulative)
+                               runBatchIndexAckTest(t, params.ackWithResponse, 
params.cumulative, params.ackGroupingOptions)
                        })
        }
 }
 
-func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool) 
{
+func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, 
option *AckGroupingOptions) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -3897,6 +3898,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse 
bool, cumulative bool) {
                        SubscriptionName:               "my-sub",
                        AckWithResponse:                ackWithResponse,
                        EnableBatchIndexAcknowledgment: true,
+                       AckGroupingOptions:             option,
                })
                assert.Nil(t, err)
                return consumer
diff --git a/pulsar/message.go b/pulsar/message.go
index d37692b..c44957d 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -181,3 +181,42 @@ func EarliestMessageID() MessageID {
 func LatestMessageID() MessageID {
        return latestMessageID
 }
+
+func messageIDCompare(lhs MessageID, rhs MessageID) int {
+       if lhs.LedgerID() < rhs.LedgerID() {
+               return -1
+       } else if lhs.LedgerID() > rhs.LedgerID() {
+               return 1
+       }
+       if lhs.EntryID() < rhs.EntryID() {
+               return -1
+       } else if lhs.EntryID() > rhs.EntryID() {
+               return 1
+       }
+       // When performing batch index ACK on a batched message whose batch 
size is N,
+       // the ACK order should be:
+       //   (ledger, entry, 0) -> (ledger, entry, 1) -> ... -> (ledger, entry, 
N-1) -> (ledger, entry)
+       // So we have to treat any MessageID with the batch index precedes the 
MessageID without the batch index
+       // if they are in the same entry.
+       if lhs.BatchIdx() < 0 && rhs.BatchIdx() < 0 {
+               return 0
+       } else if lhs.BatchIdx() >= 0 && rhs.BatchIdx() < 0 {
+               return -1
+       } else if lhs.BatchIdx() < 0 && rhs.BatchIdx() >= 0 {
+               return 1
+       }
+       if lhs.BatchIdx() < rhs.BatchIdx() {
+               return -1
+       } else if lhs.BatchIdx() > rhs.BatchIdx() {
+               return 1
+       }
+       return 0
+}
+
+func messageIDHash(id MessageID) int64 {
+       return id.LedgerID() + 31*id.EntryID()
+}
+
+func messageIDIsBatch(id MessageID) bool {
+       return id.BatchIdx() >= 0 && id.BatchSize() > 0
+}

Reply via email to