This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c24191b  Clearing message queues after seek requests (#419)
c24191b is described below

commit c24191b37a98d8731b34fb9fd44c151a2ef3a99e
Author: Miloš Matijašević <[email protected]>
AuthorDate: Tue Jan 26 04:04:52 2021 +0100

    Clearing message queues after seek requests (#419)
    
    ### Motivation
    
    Message queues should be cleared after seek requests (`Seek` and 
`SeekByTime`). If this is not performed messages that stay in message queues 
will be consumed before the sough message.
    
    ### Modifications
    
    Cleaning `queueCh` and `messageCh` after successful seek request in 
partition_consumer.go.
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change is already covered by existing tests, such as 
`TestConsumerSeekByTime` and `TestConsumerSeek`.
    
    This change added tests and can be verified as follows:
    
      - Extended `TestConsumerSeekByTime` and `TestConsumerSeek` test for 
consuming correctly sought message even if some messages stay in message queues.
    
    
    Signed-off-by: milos-matijasevic <[email protected]>
---
 pulsar/consumer_partition.go | 42 +++++++++++++++++++++++++++++++++++++-----
 pulsar/consumer_test.go      | 20 ++++++++++++--------
 2 files changed, 49 insertions(+), 13 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index d970f65..a787ffb 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -108,11 +108,12 @@ type partitionConsumer struct {
        startMessageID  trackingMessageID
        lastDequeuedMsg trackingMessageID
 
-       eventsCh        chan interface{}
-       connectedCh     chan struct{}
-       connectClosedCh chan connectionClosed
-       closeCh         chan struct{}
-       clearQueueCh    chan func(id trackingMessageID)
+       eventsCh             chan interface{}
+       connectedCh          chan struct{}
+       connectClosedCh      chan connectionClosed
+       closeCh              chan struct{}
+       clearQueueCh         chan func(id trackingMessageID)
+       clearMessageQueuesCh chan chan struct{}
 
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
@@ -144,6 +145,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                connectClosedCh:      make(chan connectionClosed, 10),
                closeCh:              make(chan struct{}),
                clearQueueCh:         make(chan func(id trackingMessageID)),
+               clearMessageQueuesCh: make(chan chan struct{}),
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                dlq:                  dlq,
                metrics:              metrics,
@@ -361,6 +363,7 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) 
error {
                pc.log.WithError(err).Error("Failed to reset to message id")
                return err
        }
+       pc.clearMessageChannels()
        return nil
 }
 
@@ -395,7 +398,15 @@ func (pc *partitionConsumer) internalSeekByTime(seek 
*seekByTimeRequest) {
        if err != nil {
                pc.log.WithError(err).Error("Failed to reset to message publish 
time")
                seek.err = err
+               return
        }
+       pc.clearMessageChannels()
+}
+
+func (pc *partitionConsumer) clearMessageChannels() {
+       doneCh := make(chan struct{})
+       pc.clearMessageQueuesCh <- doneCh
+       <-doneCh
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
@@ -659,6 +670,27 @@ func (pc *partitionConsumer) dispatcher() {
                        }
 
                        clearQueueCb(nextMessageInQueue)
+
+               case doneCh := <-pc.clearMessageQueuesCh:
+                       for len(pc.queueCh) > 0 {
+                               <-pc.queueCh
+                       }
+                       for len(pc.messageCh) > 0 {
+                               <-pc.messageCh
+                       }
+                       messages = nil
+
+                       // reset available permits
+                       pc.availablePermits = 0
+                       initialPermits := uint32(pc.queueSize)
+
+                       pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
+                       // send initial permits
+                       if err := pc.internalFlow(initialPermits); err != nil {
+                               pc.log.WithError(err).Error("unable to send 
initial permits to broker")
+                       }
+
+                       close(doneCh)
                }
        }
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 82a1f97..f0b27c6 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -795,20 +795,22 @@ func TestConsumerSeek(t *testing.T) {
        assert.Nil(t, err)
        defer consumer.Close()
 
-       const N = 10
+       // Use value bigger than 1000 to full-fill queue channel with size 1000 
and message channel with size 10
+       const N = 1100
        var seekID MessageID
-       for i := 0; i < 10; i++ {
+       for i := 0; i < N; i++ {
                id, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                assert.Nil(t, err)
 
-               if i == 4 {
+               if i == N-50 {
                        seekID = id
                }
        }
 
-       for i := 0; i < N; i++ {
+       // Don't consume all messages so some stay in queues
+       for i := 0; i < N-20; i++ {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
@@ -820,7 +822,7 @@ func TestConsumerSeek(t *testing.T) {
 
        msg, err := consumer.Receive(ctx)
        assert.Nil(t, err)
-       assert.Equal(t, "hello-4", string(msg.Payload()))
+       assert.Equal(t, fmt.Sprintf("hello-%d", N-50), string(msg.Payload()))
 }
 
 func TestConsumerSeekByTime(t *testing.T) {
@@ -847,19 +849,21 @@ func TestConsumerSeekByTime(t *testing.T) {
        assert.Nil(t, err)
        defer consumer.Close()
 
-       const N = 10
+       // Use value bigger than 1000 to full-fill queue channel with size 1000 
and message channel with size 10
+       const N = 1100
        resetTimeStr := "100s"
        retentionTimeInSecond, err := 
internal.ParseRelativeTimeInSeconds(resetTimeStr)
        assert.Nil(t, err)
 
-       for i := 0; i < 10; i++ {
+       for i := 0; i < N; i++ {
                _, err := producer.Send(ctx, &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                })
                assert.Nil(t, err)
        }
 
-       for i := 0; i < N; i++ {
+       // Don't consume all messages so some stay in queues
+       for i := 0; i < N-20; i++ {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
                assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))

Reply via email to