This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 875f6ba8 fix: seek race (#1265)
875f6ba8 is described below

commit 875f6ba883ebfcc9eb02fb339bcb3446bd0cbde4
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Nov 4 14:51:30 2024 +0800

    fix: seek race (#1265)
    
    * fix: use same goroutine to perform reconnect and seek
    
    * fix: pause dispatch message before performing seek
    
    * use chan struct{} instead chan bool
    
    * add log when seek
---
 pulsar/consumer_impl.go      |  22 +++++----
 pulsar/consumer_partition.go | 109 +++++++++++++++++++++++++------------------
 2 files changed, 76 insertions(+), 55 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 92376ac1..740a7df9 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -712,22 +712,29 @@ func (c *consumer) Seek(msgID MessageID) error {
                return err
        }
 
-       if err := c.consumers[msgID.PartitionIdx()].Seek(msgID); err != nil {
-               return err
-       }
-
+       consumer := c.consumers[msgID.PartitionIdx()]
+       consumer.pauseDispatchMessage()
        // clear messageCh
        for len(c.messageCh) > 0 {
                <-c.messageCh
        }
 
-       return nil
+       return consumer.Seek(msgID)
 }
 
 func (c *consumer) SeekByTime(time time.Time) error {
        c.Lock()
        defer c.Unlock()
        var errs error
+
+       for _, cons := range c.consumers {
+               cons.pauseDispatchMessage()
+       }
+       // clear messageCh
+       for len(c.messageCh) > 0 {
+               <-c.messageCh
+       }
+
        // run SeekByTime on every partition of topic
        for _, cons := range c.consumers {
                if err := cons.SeekByTime(time); err != nil {
@@ -736,11 +743,6 @@ func (c *consumer) SeekByTime(time time.Time) error {
                }
        }
 
-       // clear messageCh
-       for len(c.messageCh) > 0 {
-               <-c.messageCh
-       }
-
        return errs
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0e8274e6..4e8fba5a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -24,6 +24,7 @@ import (
        "math"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/backoff"
@@ -185,6 +186,16 @@ type partitionConsumer struct {
 
        redirectedClusterURI string
        backoffPolicyFunc    func() backoff.Policy
+
+       dispatcherSeekingControlCh chan struct{}
+       isSeeking                  atomic.Bool
+}
+
+// pauseDispatchMessage used to discard the message in the dispatcher 
goroutine.
+// This method will be called When the parent consumer performs the seek 
operation.
+// After the seek operation, the dispatcher will continue dispatching messages 
automatically.
+func (pc *partitionConsumer) pauseDispatchMessage() {
+       pc.dispatcherSeekingControlCh <- struct{}{}
 }
 
 func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -329,27 +340,28 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
        }
 
        pc := &partitionConsumer{
-               parentConsumer:       parent,
-               client:               client,
-               options:              options,
-               topic:                options.topic,
-               name:                 options.consumerName,
-               consumerID:           client.rpcClient.NewConsumerID(),
-               partitionIdx:         int32(options.partitionIdx),
-               eventsCh:             make(chan interface{}, 10),
-               maxQueueSize:         int32(options.receiverQueueSize),
-               queueCh:              make(chan *message, 
options.receiverQueueSize),
-               startMessageID:       atomicMessageID{msgID: 
options.startMessageID},
-               connectedCh:          make(chan struct{}),
-               messageCh:            messageCh,
-               connectClosedCh:      make(chan *connectionClosed, 1),
-               closeCh:              make(chan struct{}),
-               clearQueueCh:         make(chan func(id *trackingMessageID)),
-               compressionProviders: sync.Map{},
-               dlq:                  dlq,
-               metrics:              metrics,
-               schemaInfoCache:      newSchemaInfoCache(client, options.topic),
-               backoffPolicyFunc:    boFunc,
+               parentConsumer:             parent,
+               client:                     client,
+               options:                    options,
+               topic:                      options.topic,
+               name:                       options.consumerName,
+               consumerID:                 client.rpcClient.NewConsumerID(),
+               partitionIdx:               int32(options.partitionIdx),
+               eventsCh:                   make(chan interface{}, 10),
+               maxQueueSize:               int32(options.receiverQueueSize),
+               queueCh:                    make(chan *message, 
options.receiverQueueSize),
+               startMessageID:             atomicMessageID{msgID: 
options.startMessageID},
+               connectedCh:                make(chan struct{}),
+               messageCh:                  messageCh,
+               connectClosedCh:            make(chan *connectionClosed, 1),
+               closeCh:                    make(chan struct{}),
+               clearQueueCh:               make(chan func(id 
*trackingMessageID)),
+               compressionProviders:       sync.Map{},
+               dlq:                        dlq,
+               metrics:                    metrics,
+               schemaInfoCache:            newSchemaInfoCache(client, 
options.topic),
+               backoffPolicyFunc:          boFunc,
+               dispatcherSeekingControlCh: make(chan struct{}),
        }
        if pc.options.autoReceiverQueueSize {
                pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -1440,17 +1452,20 @@ func (pc *partitionConsumer) dispatcher() {
                        }
                        nextMessageSize = queueMsg.size()
 
-                       if pc.dlq.shouldSendToDlq(&nextMessage) {
-                               // pass the message to the DLQ router
-                               pc.metrics.DlqCounter.Inc()
-                               messageCh = pc.dlq.Chan()
+                       if !pc.isSeeking.Load() {
+                               if pc.dlq.shouldSendToDlq(&nextMessage) {
+                                       // pass the message to the DLQ router
+                                       pc.metrics.DlqCounter.Inc()
+                                       messageCh = pc.dlq.Chan()
+                               } else {
+                                       // pass the message to application 
channel
+                                       messageCh = pc.messageCh
+                               }
+                               pc.metrics.PrefetchedMessages.Dec()
+                               
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
                        } else {
-                               // pass the message to application channel
-                               messageCh = pc.messageCh
+                               pc.log.Debug("skip dispatching messages when 
seeking")
                        }
-
-                       pc.metrics.PrefetchedMessages.Dec()
-                       
pc.metrics.PrefetchedBytes.Sub(float64(len(queueMsg.payLoad)))
                } else {
                        queueCh = pc.queueCh
                }
@@ -1483,6 +1498,13 @@ func (pc *partitionConsumer) dispatcher() {
                                pc.log.WithError(err).Error("unable to send 
initial permits to broker")
                        }
 
+               case _, ok := <-pc.dispatcherSeekingControlCh:
+                       if !ok {
+                               return
+                       }
+                       pc.log.Debug("received dispatcherSeekingControlCh, set 
isSeek to true")
+                       pc.isSeeking.Store(true)
+
                case msg, ok := <-queueCh:
                        if !ok {
                                return
@@ -1587,22 +1609,16 @@ func (pc *partitionConsumer) runEventsLoop() {
        }()
        pc.log.Debug("get into runEventsLoop")
 
-       go func() {
-               for {
-                       select {
-                       case <-pc.closeCh:
-                               pc.log.Info("close consumer, exit reconnect")
-                               return
-                       case connectionClosed := <-pc.connectClosedCh:
-                               pc.log.Debug("runEventsLoop will reconnect")
-                               pc.reconnectToBroker(connectionClosed)
-                       }
-               }
-       }()
-
        for {
-               for i := range pc.eventsCh {
-                       switch v := i.(type) {
+               select {
+               case <-pc.closeCh:
+                       pc.log.Info("close consumer, exit reconnect")
+                       return
+               case connectionClosed := <-pc.connectClosedCh:
+                       pc.log.Debug("runEventsLoop will reconnect")
+                       pc.reconnectToBroker(connectionClosed)
+               case event := <-pc.eventsCh:
+                       switch v := event.(type) {
                        case *ackRequest:
                                pc.internalAck(v)
                        case *ackWithTxnRequest:
@@ -1680,6 +1696,9 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker(connectionClosed 
*connectionClosed) {
+       if pc.isSeeking.CompareAndSwap(true, false) {
+               pc.log.Debug("seek operation triggers reconnection, and reset 
isSeeking")
+       }
        var (
                maxRetry                                    int
                delayReconnectTime, totalDelayReconnectTime time.Duration

Reply via email to