This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7d257b0 Support grouping ACK requests by time and size (#957)
7d257b0 is described below
commit 7d257b01b1216e0009495a4aaf83b1716ba458a9
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Feb 22 08:38:22 2023 +0800
Support grouping ACK requests by time and size (#957)
* Support grouping ACK requests by time and size
Fixes #949
### Motivation
Currently the Go client does not support grouping ACK requests, so each
time `Ack` (or similar APIs) is called, a ACK request will be sent,
which could downgrade the performance. We need to support configuring
the time and size to cache `MessageID` before sending ACK requests.
### Modifications
- Add an `AckGroupingOptions` field to `ConsumerOptions`, when it's nil,
use 100ms as the max time and 1000 as the max size.
- Add an `ackGroupingTracker` interface to support grouping ACK requests
- When `AckWithResponse` is false, adding the `MessageID` instance to
the tracker instead of sending the requests to `eventsCh`.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
- Added `ack_grouping_tracker_test.go` to verify `ackGroupingTracker`
in various cases
- The consumer side change can be covered by existing tests because
the default `AckGroupingOptions` config is
`{ MaxSize: 1000, MaxTime: 100*time.Millisecond }`.
* Fix flushAndClean race
* Use unbuffered channel for flush operations
* Apply different AckGroupingOptions and expose this config
---
pulsar/ack_grouping_tracker.go | 298 ++++++++++++++++++++++++++++++++++++
pulsar/ack_grouping_tracker_test.go | 197 ++++++++++++++++++++++++
pulsar/consumer.go | 21 +++
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 83 ++++++----
pulsar/consumer_partition_test.go | 6 +
pulsar/consumer_test.go | 54 +++----
pulsar/message.go | 39 +++++
8 files changed, 647 insertions(+), 52 deletions(-)
diff --git a/pulsar/ack_grouping_tracker.go b/pulsar/ack_grouping_tracker.go
new file mode 100644
index 0000000..dbc70f5
--- /dev/null
+++ b/pulsar/ack_grouping_tracker.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "time"
+
+ "github.com/bits-and-blooms/bitset"
+)
+
+type ackGroupingTracker interface {
+ add(id MessageID)
+
+ addCumulative(id MessageID)
+
+ isDuplicate(id MessageID) bool
+
+ flush()
+
+ flushAndClean()
+
+ close()
+}
+
+type ackFlushType int
+
+const (
+ flushOnly ackFlushType = iota
+ flushAndClean
+ flushAndClose
+)
+
+func newAckGroupingTracker(options *AckGroupingOptions,
+ ackIndividual func(id MessageID),
+ ackCumulative func(id MessageID)) ackGroupingTracker {
+ if options == nil {
+ options = &AckGroupingOptions{
+ MaxSize: 1000,
+ MaxTime: 100 * time.Millisecond,
+ }
+ }
+
+ if options.MaxSize <= 1 {
+ return &immediateAckGroupingTracker{
+ ackIndividual: ackIndividual,
+ ackCumulative: ackCumulative,
+ }
+ }
+
+ c := &cachedAcks{
+ singleAcks: make([]MessageID, options.MaxSize),
+ pendingAcks: make(map[int64]*bitset.BitSet),
+ lastCumulativeAck: EarliestMessageID(),
+ ackIndividual: ackIndividual,
+ ackCumulative: ackCumulative,
+ ackList: func(ids []MessageID) {
+ // TODO: support ack a list of MessageIDs
+ for _, id := range ids {
+ ackIndividual(id)
+ }
+ },
+ }
+
+ timeout := time.NewTicker(time.Hour)
+ if options.MaxTime > 0 {
+ timeout = time.NewTicker(options.MaxTime)
+ } else {
+ timeout.Stop()
+ }
+ t := &timedAckGroupingTracker{
+ ackIndividualCh: make(chan MessageID),
+ ackCumulativeCh: make(chan MessageID),
+ duplicateIDCh: make(chan MessageID),
+ duplicateResultCh: make(chan bool),
+ flushCh: make(chan ackFlushType),
+ waitFlushCh: make(chan bool),
+ }
+ go func() {
+ for {
+ select {
+ case id := <-t.ackIndividualCh:
+ if c.addAndCheckIfFull(id) {
+ c.flushIndividualAcks()
+ if options.MaxTime > 0 {
+ timeout.Reset(options.MaxTime)
+ }
+ }
+ case id := <-t.ackCumulativeCh:
+ c.tryUpdateLastCumulativeAck(id)
+ if options.MaxTime <= 0 {
+ c.flushCumulativeAck()
+ }
+ case id := <-t.duplicateIDCh:
+ t.duplicateResultCh <- c.isDuplicate(id)
+ case <-timeout.C:
+ c.flush()
+ case ackFlushType := <-t.flushCh:
+ timeout.Stop()
+ c.flush()
+ if ackFlushType == flushAndClean {
+ c.clean()
+ }
+ t.waitFlushCh <- true
+ if ackFlushType == flushAndClose {
+ return
+ }
+ }
+ }
+ }()
+ return t
+}
+
+type immediateAckGroupingTracker struct {
+ ackIndividual func(id MessageID)
+ ackCumulative func(id MessageID)
+}
+
+func (i *immediateAckGroupingTracker) add(id MessageID) {
+ i.ackIndividual(id)
+}
+
+func (i *immediateAckGroupingTracker) addCumulative(id MessageID) {
+ i.ackCumulative(id)
+}
+
+func (i *immediateAckGroupingTracker) isDuplicate(id MessageID) bool {
+ return false
+}
+
+func (i *immediateAckGroupingTracker) flush() {
+}
+
+func (i *immediateAckGroupingTracker) flushAndClean() {
+}
+
+func (i *immediateAckGroupingTracker) close() {
+}
+
+type cachedAcks struct {
+ singleAcks []MessageID
+ index int
+
+ // Key is the hash code of the ledger id and the netry id,
+ // Value is the bit set that represents which messages are acknowledged
if the entry stores a batch.
+ // The bit 1 represents the message has been acknowledged, i.e. the
bits "111" represents all messages
+ // in the batch whose batch size is 3 are not acknowledged.
+ // After the 1st message (i.e. batch index is 0) is acknowledged, the
bits will become "011".
+ // Value is nil if the entry represents a single message.
+ pendingAcks map[int64]*bitset.BitSet
+
+ lastCumulativeAck MessageID
+ cumulativeAckRequired bool
+
+ ackIndividual func(id MessageID)
+ ackCumulative func(id MessageID)
+ ackList func(ids []MessageID)
+}
+
+func (t *cachedAcks) addAndCheckIfFull(id MessageID) bool {
+ t.singleAcks[t.index] = id
+ t.index++
+ key := messageIDHash(id)
+ ackSet, found := t.pendingAcks[key]
+ if !found {
+ if messageIDIsBatch(id) {
+ ackSet = bitset.New(uint(id.BatchSize()))
+ for i := 0; i < int(id.BatchSize()); i++ {
+ ackSet.Set(uint(i))
+ }
+ t.pendingAcks[key] = ackSet
+ } else {
+ t.pendingAcks[key] = nil
+ }
+ }
+ if ackSet != nil {
+ ackSet.Clear(uint(id.BatchIdx()))
+ }
+ return t.index == len(t.singleAcks)
+}
+
+func (t *cachedAcks) tryUpdateLastCumulativeAck(id MessageID) {
+ if messageIDCompare(t.lastCumulativeAck, id) < 0 {
+ t.lastCumulativeAck = id
+ t.cumulativeAckRequired = true
+ }
+}
+
+func (t *cachedAcks) isDuplicate(id MessageID) bool {
+ if messageIDCompare(t.lastCumulativeAck, id) >= 0 {
+ return true
+ }
+ ackSet, found := t.pendingAcks[messageIDHash(id)]
+ if !found {
+ return false
+ }
+ if ackSet == nil || !messageIDIsBatch(id) {
+ // NOTE: should we panic when ackSet != nil and
messageIDIsBatch(id) is true?
+ return true
+ }
+ // 0 represents the message has been acknowledged
+ return !ackSet.Test(uint(id.BatchIdx()))
+}
+
+func (t *cachedAcks) flushIndividualAcks() {
+ if t.index > 0 {
+ t.ackList(t.singleAcks[0:t.index])
+ for _, id := range t.singleAcks[0:t.index] {
+ key := messageIDHash(id)
+ ackSet, found := t.pendingAcks[key]
+ if !found {
+ continue
+ }
+ if ackSet == nil {
+ delete(t.pendingAcks, key)
+ } else {
+ ackSet.Clear(uint(id.BatchIdx()))
+ if ackSet.None() { // all messages have been
acknowledged
+ delete(t.pendingAcks, key)
+ }
+ }
+ delete(t.pendingAcks, messageIDHash(id))
+ }
+ t.index = 0
+ }
+}
+
+func (t *cachedAcks) flushCumulativeAck() {
+ if t.cumulativeAckRequired {
+ t.ackCumulative(t.lastCumulativeAck)
+ t.cumulativeAckRequired = false
+ }
+}
+
+func (t *cachedAcks) flush() {
+ t.flushIndividualAcks()
+ t.flushCumulativeAck()
+}
+
+func (t *cachedAcks) clean() {
+ maxSize := len(t.singleAcks)
+ t.singleAcks = make([]MessageID, maxSize)
+ t.index = 0
+ t.pendingAcks = make(map[int64]*bitset.BitSet)
+ t.lastCumulativeAck = EarliestMessageID()
+ t.cumulativeAckRequired = false
+}
+
+type timedAckGroupingTracker struct {
+ ackIndividualCh chan MessageID
+ ackCumulativeCh chan MessageID
+ duplicateIDCh chan MessageID
+ duplicateResultCh chan bool
+ flushCh chan ackFlushType
+ waitFlushCh chan bool
+}
+
+func (t *timedAckGroupingTracker) add(id MessageID) {
+ t.ackIndividualCh <- id
+}
+
+func (t *timedAckGroupingTracker) addCumulative(id MessageID) {
+ t.ackCumulativeCh <- id
+}
+
+func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool {
+ t.duplicateIDCh <- id
+ return <-t.duplicateResultCh
+}
+
+func (t *timedAckGroupingTracker) flush() {
+ t.flushCh <- flushOnly
+ <-t.waitFlushCh
+}
+
+func (t *timedAckGroupingTracker) flushAndClean() {
+ t.flushCh <- flushAndClean
+ <-t.waitFlushCh
+}
+
+func (t *timedAckGroupingTracker) close() {
+ t.flushCh <- flushAndClose
+ <-t.waitFlushCh
+}
diff --git a/pulsar/ack_grouping_tracker_test.go
b/pulsar/ack_grouping_tracker_test.go
new file mode 100644
index 0000000..d7903e8
--- /dev/null
+++ b/pulsar/ack_grouping_tracker_test.go
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNoCacheTracker(t *testing.T) {
+ tests := []AckGroupingOptions{
+ {
+ MaxSize: 0,
+ MaxTime: 10 * time.Hour,
+ },
+ {
+ MaxSize: 1,
+ MaxTime: 10 * time.Hour,
+ },
+ }
+ for _, option := range tests {
+ t.Run(fmt.Sprintf("TestAckImmediately_size_%v_time_%vs",
option.MaxSize, option.MaxTime.Seconds()),
+ func(t *testing.T) {
+ ledgerID0 := int64(-1)
+ ledgerID1 := int64(-1)
+ tracker := newAckGroupingTracker(&option,
+ func(id MessageID) { ledgerID0 =
id.LedgerID() },
+ func(id MessageID) { ledgerID1 =
id.LedgerID() })
+
+ tracker.add(&messageID{ledgerID: 1})
+ assert.Equal(t, atomic.LoadInt64(&ledgerID0),
int64(1))
+ tracker.addCumulative(&messageID{ledgerID: 2})
+ assert.Equal(t, atomic.LoadInt64(&ledgerID1),
int64(2))
+ })
+ }
+}
+
+type mockAcker struct {
+ sync.Mutex
+ ledgerIDs []int64
+ cumulativeLedgerID int64
+}
+
+func (a *mockAcker) ack(id MessageID) {
+ defer a.Unlock()
+ a.Lock()
+ a.ledgerIDs = append(a.ledgerIDs, id.LedgerID())
+}
+
+func (a *mockAcker) ackCumulative(id MessageID) {
+ atomic.StoreInt64(&a.cumulativeLedgerID, id.LedgerID())
+}
+
+func (a *mockAcker) getLedgerIDs() []int64 {
+ defer a.Unlock()
+ a.Lock()
+ return a.ledgerIDs
+}
+
+func (a *mockAcker) getCumulativeLedgerID() int64 {
+ return atomic.LoadInt64(&a.cumulativeLedgerID)
+}
+
+func (a *mockAcker) reset() {
+ a.ledgerIDs = make([]int64, 0)
+ a.cumulativeLedgerID = int64(0)
+}
+
+func TestCachedTracker(t *testing.T) {
+ var acker mockAcker
+ tracker := newAckGroupingTracker(&AckGroupingOptions{MaxSize: 3,
MaxTime: 0},
+ func(id MessageID) { acker.ack(id) }, func(id MessageID) {
acker.ackCumulative(id) })
+
+ tracker.add(&messageID{ledgerID: 1})
+ tracker.add(&messageID{ledgerID: 2})
+ for i := 1; i <= 2; i++ {
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID:
int64(i)}))
+ }
+ assert.Equal(t, 0, len(acker.getLedgerIDs()))
+ tracker.add(&messageID{ledgerID: 3})
+ assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 0
},
+ 10*time.Millisecond, 2*time.Millisecond)
+ assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
+ for i := 1; i <= 3; i++ {
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID:
int64(i)}))
+ }
+
+ tracker.add(&messageID{ledgerID: 4})
+ // 4 won't be added because the cache is not full
+ assert.Equal(t, []int64{1, 2, 3}, acker.getLedgerIDs())
+
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: 5}))
+ tracker.addCumulative(&messageID{ledgerID: 5})
+ for i := 0; i <= 5; i++ {
+ assert.True(t, tracker.isDuplicate(&messageID{ledgerID:
int64(i)}))
+ }
+ assert.Equal(t, int64(5), acker.getCumulativeLedgerID())
+ assert.False(t, tracker.isDuplicate(&messageID{ledgerID: int64(6)}))
+
+ tracker.flush()
+ assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) > 3
},
+ 10*time.Millisecond, 2*time.Millisecond)
+ assert.Equal(t, []int64{1, 2, 3, 4}, acker.getLedgerIDs())
+}
+
+func TestTimedTrackerIndividualAck(t *testing.T) {
+ var acker mockAcker
+ // MaxSize: 1000, MaxTime: 100ms
+ tracker := newAckGroupingTracker(nil, func(id MessageID) {
acker.ack(id) }, nil)
+
+ expected := make([]int64, 0)
+ for i := 0; i < 999; i++ {
+ tracker.add(&messageID{ledgerID: int64(i)})
+ expected = append(expected, int64(i))
+ }
+ assert.Equal(t, 0, len(acker.getLedgerIDs()))
+
+ // case 1: flush because the tracker timed out
+ assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) ==
999 },
+ 150*time.Millisecond, 10*time.Millisecond)
+ assert.Equal(t, expected, acker.getLedgerIDs())
+
+ // case 2: flush because cache is full
+ time.Sleep(50) // see case 3
+ acker.reset()
+ expected = append(expected, 999)
+ for i := 0; i < 1001; i++ {
+ tracker.add(&messageID{ledgerID: int64(i)})
+ }
+ assert.Equal(t, expected, acker.getLedgerIDs())
+
+ // case 3: flush will reset the timer
+ start := time.Now()
+ assert.Eventually(t, func() bool { return len(acker.getLedgerIDs()) >
1000 },
+ 150*time.Millisecond, 10*time.Millisecond)
+ elapsed := time.Since(start)
+ assert.GreaterOrEqual(t, elapsed, int64(100), "elapsed", elapsed)
+ assert.Equal(t, append(expected, 1000), acker.getLedgerIDs())
+}
+
+func TestTimedTrackerCumulativeAck(t *testing.T) {
+ var acker mockAcker
+ // MaxTime is 100ms
+ tracker := newAckGroupingTracker(nil, nil, func(id MessageID) {
acker.ackCumulative(id) })
+
+ // case 1: flush because of the timeout
+ tracker.addCumulative(&messageID{ledgerID: 1})
+ assert.NotEqual(t, int64(1), acker.getCumulativeLedgerID())
+ assert.Eventually(t, func() bool { return acker.getCumulativeLedgerID()
== int64(1) },
+ 150*time.Millisecond, 10*time.Millisecond)
+ assert.Equal(t, int64(1), acker.getCumulativeLedgerID())
+
+ // case 2: flush manually
+ tracker.addCumulative(&messageID{ledgerID: 2})
+ tracker.flush()
+ assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
+
+ // case 3: older MessageID cannot be acknowledged
+ tracker.addCumulative(&messageID{ledgerID: 1})
+ tracker.flush()
+ assert.Equal(t, int64(2), acker.getCumulativeLedgerID())
+}
+
+func TestTimedTrackerIsDuplicate(t *testing.T) {
+ tracker := newAckGroupingTracker(nil, func(id MessageID) {}, func(id
MessageID) {})
+
+ tracker.add(messageID{batchIdx: 0, batchSize: 3})
+ tracker.add(messageID{batchIdx: 2, batchSize: 3})
+ assert.True(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize:
3}))
+ assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize:
3}))
+ assert.True(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize:
3}))
+
+ tracker.flush()
+ assert.False(t, tracker.isDuplicate(messageID{batchIdx: 0, batchSize:
3}))
+ assert.False(t, tracker.isDuplicate(messageID{batchIdx: 1, batchSize:
3}))
+ assert.False(t, tracker.isDuplicate(messageID{batchIdx: 2, batchSize:
3}))
+}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 8bae57d..9576d7a 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -80,6 +80,20 @@ type DLQPolicy struct {
RetryLetterTopic string
}
+// AckGroupingOptions controls how to group ACK requests
+// If maxSize is 0 or 1, any ACK request will be sent immediately.
+// Otherwise, the ACK requests will be cached until one of the following
conditions meets:
+// 1. There are `MaxSize` pending ACK requests.
+// 2. `MaxTime` is greater than 1 microsecond and ACK requests have been
cached for `maxTime`.
+// Specially, for cumulative acknowledgment, only the latest ACK is cached and
it will only be sent after `MaxTime`.
+type AckGroupingOptions struct {
+ // The maximum number of ACK requests to cache
+ MaxSize uint32
+
+ // The maximum time to cache ACK requests
+ MaxTime time.Duration
+}
+
// ConsumerOptions is used to configure and create instances of Consumer.
type ConsumerOptions struct {
// Topic specifies the topic this consumer will subscribe on.
@@ -215,6 +229,13 @@ type ConsumerOptions struct {
// Enable or disable batch index acknowledgment. To enable this
feature, ensure batch index acknowledgment
// is enabled on the broker side. (default: false)
EnableBatchIndexAcknowledgment bool
+
+ // Controls how to group ACK requests, the default value is nil, which
means:
+ // MaxSize: 1000
+ // MaxTime: 100*time.Millisecond
+ // NOTE: This option does not work if AckWithResponse is true
+ // because there are only synchronous APIs for acknowledgment
+ AckGroupingOptions *AckGroupingOptions
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index bf136c8..8ee1822 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -398,6 +398,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
autoAckIncompleteChunk:
c.options.AutoAckIncompleteChunk,
consumerEventListener:
c.options.EventListener,
enableBatchIndexAck:
c.options.EnableBatchIndexAcknowledgment,
+ ackGroupingOptions:
c.options.AckGroupingOptions,
}
cons, err := newPartitionConsumer(c, c.client, opts,
c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e723f8a..95a8d32 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -116,6 +116,7 @@ type partitionConsumerOpts struct {
// in failover mode, this callback will be called when consumer change
consumerEventListener ConsumerEventListener
enableBatchIndexAck bool
+ ackGroupingOptions *AckGroupingOptions
}
type ConsumerEventListener interface {
@@ -167,6 +168,7 @@ type partitionConsumer struct {
chunkedMsgCtxMap *chunkedMsgCtxMap
unAckChunksTracker *unAckChunksTracker
+ ackGroupingTracker ackGroupingTracker
}
func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -310,6 +312,9 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap =
newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
+ pc.ackGroupingTracker =
newAckGroupingTracker(options.ackGroupingOptions,
+ func(id MessageID) { pc.sendIndividualAck(id) },
+ func(id MessageID) { pc.sendCumulativeAck(id) })
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
@@ -467,30 +472,35 @@ func (pc *partitionConsumer) ackID(msgID MessageID,
withResponse bool) error {
return errors.New("failed to convert trackingMessageID")
}
- ackReq := new(ackRequest)
- ackReq.doneCh = make(chan struct{})
- ackReq.ackType = individualAck
if !trackingID.Undefined() && trackingID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
/ 1.0e9)
- ackReq.msgID = trackingID
- // send ack request to eventsCh
- pc.eventsCh <- ackReq
-
- if withResponse {
- <-ackReq.doneCh
- }
-
- pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
- } else if pc.options.enableBatchIndexAck {
- ackReq.msgID = trackingID
- pc.eventsCh <- ackReq
+ } else if !pc.options.enableBatchIndexAck {
+ return nil
}
+ var ackReq *ackRequest
if withResponse {
- return ackReq.err
+ ackReq := pc.sendIndividualAck(&trackingID)
+ <-ackReq.doneCh
+ } else {
+ pc.ackGroupingTracker.add(&trackingID)
}
- return nil
+ pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
+ if ackReq == nil {
+ return nil
+ }
+ return ackReq.err
+}
+
+func (pc *partitionConsumer) sendIndividualAck(msgID MessageID) *ackRequest {
+ ackReq := &ackRequest{
+ doneCh: make(chan struct{}),
+ ackType: individualAck,
+ msgID: *msgID.(*trackingMessageID),
+ }
+ pc.eventsCh <- ackReq
+ return ackReq
}
func (pc *partitionConsumer) AckIDWithResponse(msgID MessageID) error {
@@ -524,14 +534,12 @@ func (pc *partitionConsumer)
internalAckIDCumulative(msgID MessageID, withRespon
return nil
}
- ackReq := new(ackRequest)
- ackReq.doneCh = make(chan struct{})
- ackReq.ackType = cumulativeAck
+ var msgIDToAck trackingMessageID
if trackingID.ackCumulative() || pc.options.enableBatchIndexAck {
- ackReq.msgID = trackingID
+ msgIDToAck = trackingID
} else if !trackingID.tracker.hasPrevBatchAcked() {
// get previous batch message id
- ackReq.msgID = trackingID.prev()
+ msgIDToAck = trackingID.prev()
trackingID.tracker.setPrevBatchAcked()
} else {
// waiting for all the msgs are acked in this batch
@@ -540,12 +548,13 @@ func (pc *partitionConsumer)
internalAckIDCumulative(msgID MessageID, withRespon
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano())
/ 1.0e9)
- // send ack request to eventsCh
- pc.eventsCh <- ackReq
+ var ackReq *ackRequest
if withResponse {
- // wait for the request to complete if withResponse set true
+ ackReq := pc.sendCumulativeAck(&msgIDToAck)
<-ackReq.doneCh
+ } else {
+ pc.ackGroupingTracker.addCumulative(&msgIDToAck)
}
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
@@ -554,7 +563,20 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID
MessageID, withRespon
pc.unAckChunksTracker.remove(cmid)
}
- return nil
+ if ackReq == nil {
+ return nil
+ }
+ return ackReq.err
+}
+
+func (pc *partitionConsumer) sendCumulativeAck(msgID MessageID) *ackRequest {
+ ackReq := &ackRequest{
+ doneCh: make(chan struct{}),
+ ackType: cumulativeAck,
+ msgID: *msgID.(*trackingMessageID),
+ }
+ pc.eventsCh <- ackReq
+ return ackReq
}
func (pc *partitionConsumer) NackID(msgID MessageID) {
@@ -631,6 +653,9 @@ func (pc *partitionConsumer) Close() {
return
}
+ // flush all pending ACK requests and terminate the timer goroutine
+ pc.ackGroupingTracker.close()
+
// close chunkedMsgCtxMap
pc.chunkedMsgCtxMap.Close()
@@ -658,6 +683,7 @@ func (pc *partitionConsumer) Seek(msgID MessageID) error {
return errors.New("unhandled messageID type")
}
+ pc.ackGroupingTracker.flushAndClean()
pc.eventsCh <- req
// wait for the request to complete
@@ -715,6 +741,7 @@ func (pc *partitionConsumer) SeekByTime(time time.Time)
error {
doneCh: make(chan struct{}),
publishTime: time,
}
+ pc.ackGroupingTracker.flushAndClean()
pc.eventsCh <- req
// wait for the request to complete
@@ -957,6 +984,10 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
msgID = trackingMsgID
}
+ if pc.ackGroupingTracker.isDuplicate(msgID) {
+ continue
+ }
+
var messageIndex *uint64
var brokerPublishTime *time.Time
if brokerMetadata != nil {
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index fd50b70..b9a9a02 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -37,6 +37,8 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil)
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -73,6 +75,8 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil)
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
@@ -105,6 +109,8 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
+ pc.ackGroupingTracker =
newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
+ func(id MessageID) { pc.sendIndividualAck(id) }, nil)
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
if err := pc.MessageReceived(nil, headersAndPayload); err != nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 0eb7aae..de90c0e 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3853,37 +3853,38 @@ func TestAckWithMessageID(t *testing.T) {
}
func TestBatchIndexAck(t *testing.T) {
- tests := []struct {
- AckWithResponse bool
- Cumulative bool
- }{
- {
- AckWithResponse: true,
- Cumulative: true,
- },
- {
- AckWithResponse: true,
- Cumulative: false,
- },
- {
- AckWithResponse: false,
- Cumulative: true,
- },
- {
- AckWithResponse: false,
- Cumulative: false,
- },
- }
- for _, params := range tests {
-
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v",
- params.AckWithResponse, params.Cumulative),
+ type config struct {
+ ackWithResponse bool
+ cumulative bool
+ ackGroupingOptions *AckGroupingOptions
+ }
+ configs := make([]config, 0)
+ for _, option := range []*AckGroupingOptions{
+ nil, // MaxSize: 1000, MaxTime: 10ms
+ {MaxSize: 0, MaxTime: 0},
+ {MaxSize: 1000, MaxTime: 0},
+ } {
+ configs = append(configs, config{true, true, option})
+ configs = append(configs, config{true, false, option})
+ configs = append(configs, config{false, true, option})
+ configs = append(configs, config{false, false, option})
+ }
+
+ for _, params := range configs {
+ option := params.ackGroupingOptions
+ if option == nil {
+ option = &AckGroupingOptions{1000, 10 *
time.Millisecond}
+ }
+
+
t.Run(fmt.Sprintf("TestBatchIndexAck_WithResponse_%v_Cumulative_%v_AckGroupingOption_%v_%v",
+ params.ackWithResponse, params.cumulative,
option.MaxSize, option.MaxTime.Milliseconds()),
func(t *testing.T) {
- runBatchIndexAckTest(t, params.AckWithResponse,
params.Cumulative)
+ runBatchIndexAckTest(t, params.ackWithResponse,
params.cumulative, params.ackGroupingOptions)
})
}
}
-func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool)
{
+func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool,
option *AckGroupingOptions) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -3897,6 +3898,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse
bool, cumulative bool) {
SubscriptionName: "my-sub",
AckWithResponse: ackWithResponse,
EnableBatchIndexAcknowledgment: true,
+ AckGroupingOptions: option,
})
assert.Nil(t, err)
return consumer
diff --git a/pulsar/message.go b/pulsar/message.go
index d37692b..c44957d 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -181,3 +181,42 @@ func EarliestMessageID() MessageID {
func LatestMessageID() MessageID {
return latestMessageID
}
+
+func messageIDCompare(lhs MessageID, rhs MessageID) int {
+ if lhs.LedgerID() < rhs.LedgerID() {
+ return -1
+ } else if lhs.LedgerID() > rhs.LedgerID() {
+ return 1
+ }
+ if lhs.EntryID() < rhs.EntryID() {
+ return -1
+ } else if lhs.EntryID() > rhs.EntryID() {
+ return 1
+ }
+ // When performing batch index ACK on a batched message whose batch
size is N,
+ // the ACK order should be:
+ // (ledger, entry, 0) -> (ledger, entry, 1) -> ... -> (ledger, entry,
N-1) -> (ledger, entry)
+ // So we have to treat any MessageID with the batch index precedes the
MessageID without the batch index
+ // if they are in the same entry.
+ if lhs.BatchIdx() < 0 && rhs.BatchIdx() < 0 {
+ return 0
+ } else if lhs.BatchIdx() >= 0 && rhs.BatchIdx() < 0 {
+ return -1
+ } else if lhs.BatchIdx() < 0 && rhs.BatchIdx() >= 0 {
+ return 1
+ }
+ if lhs.BatchIdx() < rhs.BatchIdx() {
+ return -1
+ } else if lhs.BatchIdx() > rhs.BatchIdx() {
+ return 1
+ }
+ return 0
+}
+
+func messageIDHash(id MessageID) int64 {
+ return id.LedgerID() + 31*id.EntryID()
+}
+
+func messageIDIsBatch(id MessageID) bool {
+ return id.BatchIdx() >= 0 && id.BatchSize() > 0
+}