This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new eb9b69e  Make nack tracker tests more robust. (#122)
eb9b69e is described below

commit eb9b69e254693df8c922b361106a6c1885c1019d
Author: cckellogg <[email protected]>
AuthorDate: Wed Dec 11 17:44:06 2019 -0800

    Make nack tracker tests more robust. (#122)
    
    * Make nack tracker tests more robust.
    
    * Fix time corner cases in tests.
    
    * Fix test race condition.
    
    * Fix comment.
---
 pulsar/negative_acks_tracker_test.go | 77 ++++++++++++++++++++++++------------
 1 file changed, 52 insertions(+), 25 deletions(-)

diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 733114a..4dac66e 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -18,44 +18,64 @@
 package pulsar
 
 import (
-       "github.com/stretchr/testify/assert"
        "sort"
        "sync"
        "testing"
        "time"
+
+       "github.com/stretchr/testify/assert"
 )
 
+const testNackDelay = 300 * time.Millisecond
+
 type nackMockedConsumer struct {
-       sync.Mutex
-       cond   *sync.Cond
+       ch chan messageID
+       closed bool
+       lock sync.Mutex
        msgIds []messageID
 }
 
-func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
-       nmc.Lock()
-       if nmc.msgIds == nil {
-               nmc.msgIds = msgIds
-               sort.Slice(msgIds, func(i, j int) bool {
-                       return msgIds[i].ledgerID < msgIds[j].entryID
-               })
-               nmc.cond.Signal()
+func newNackMockedConsumer() *nackMockedConsumer {
+       t := &nackMockedConsumer{
+               ch: make(chan messageID, 10),
        }
+       go func() {
+               // since the client ticks at an interval of delay / 3
+               // wait another interval to ensure we get all messages
+               time.Sleep(testNackDelay + 101 * time.Millisecond)
+               t.lock.Lock()
+               defer t.lock.Unlock()
+               t.closed = true
+               close(t.ch)
+       }()
+       return t
+}
 
-       nmc.Unlock()
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+       nmc.lock.Lock()
+       defer nmc.lock.Unlock()
+       if nmc.closed {
+               return
+       }
+       for _, id := range msgIds {
+               nmc.ch <- id
+       }
 }
 
-func (nmc *nackMockedConsumer) Wait() []messageID {
-       nmc.Lock()
-       defer nmc.Unlock()
-       nmc.cond.Wait()
+func sortMessageIds(msgIds []messageID) []messageID {
+       sort.Slice(msgIds, func(i, j int) bool {
+               return msgIds[i].ledgerID < msgIds[j].entryID
+       })
+       return msgIds
+}
 
-       return nmc.msgIds
+func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+       return nmc.ch
 }
 
 func TestNacksTracker(t *testing.T) {
-       nmc := &nackMockedConsumer{}
-       nmc.cond = sync.NewCond(nmc)
-       nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+       nmc := newNackMockedConsumer()
+       nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
        nacks.Add(&messageID{
                ledgerID: 1,
@@ -69,7 +89,11 @@ func TestNacksTracker(t *testing.T) {
                batchIdx: 1,
        })
 
-       msgIds := nmc.Wait()
+       msgIds := make([]messageID, 0)
+       for id := range nmc.Wait() {
+               msgIds = append(msgIds, id)
+       }
+       msgIds = sortMessageIds(msgIds)
 
        assert.Equal(t, 2, len(msgIds))
        assert.Equal(t, int64(1), msgIds[0].ledgerID)
@@ -81,9 +105,8 @@ func TestNacksTracker(t *testing.T) {
 }
 
 func TestNacksWithBatchesTracker(t *testing.T) {
-       nmc := &nackMockedConsumer{}
-       nmc.cond = sync.NewCond(nmc)
-       nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+       nmc := newNackMockedConsumer()
+       nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
        nacks.Add(&messageID{
                ledgerID: 1,
@@ -109,7 +132,11 @@ func TestNacksWithBatchesTracker(t *testing.T) {
                batchIdx: 1,
        })
 
-       msgIds := nmc.Wait()
+       msgIds := make([]messageID, 0)
+       for id := range nmc.Wait() {
+               msgIds = append(msgIds, id)
+       }
+       msgIds = sortMessageIds(msgIds)
 
        assert.Equal(t, 2, len(msgIds))
        assert.Equal(t, int64(1), msgIds[0].ledgerID)

Reply via email to