This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit a5f06793bd4365012641a79825267a51d4ce43c9
Author: Zike Yang <[email protected]>
AuthorDate: Wed Feb 28 18:39:00 2024 +0800

    [fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182)
    
    Fixes #1171
    
    ### Motivation
    
    If `getLastMessageId` continually fails, the reader.HasNext can get stuck 
in an infinite loop. Without any backoff, the reader would keep trying forever.
    
    ### Modifications
    
    - Implemented a backoff policy for `getLastMessageID`.
    - If HasNext fails, it now returns false.
    
    #### Should the reader.HasNext returned `false` in case of failure?
    
    Currently, the `HasNext` method doesn't report errors. However, failure is 
still possible. For instance, if `getLastMessageID` repeatedly fails and hits 
the retry limit. An option is to keep trying forever, but this would stall all 
user code. This isn't user-friendly, so I rejected this solution.
    
    #### Couldn't utilize the BackOffPolicy in the Reader Options
    
    The `HasNext` retry mechanism requires to use of `IsMaxBackoffReached` for 
the backoff. But it isn't exposed in the `BackOffPolicy` interface. Introducing 
a new method to the `BackOffPolicy` would introduce breaking changes for the 
user backoff implementation. So, I choose not to implement it. Before we do it, 
we need to refine the BackOffPolicy.
    
    (cherry picked from commit 88a8d85cf6d6a4f282a5b39a2140a7bb06ba0f3b)
---
 pulsar/client_impl.go        | 24 +++++++++--------
 pulsar/consumer_partition.go | 53 +++++++++++++++++++++++++-----------
 pulsar/reader.go             |  1 +
 pulsar/reader_test.go        | 64 ++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 115 insertions(+), 27 deletions(-)

diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7daf6f62..65aed3b9 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -40,14 +40,15 @@ const (
 )
 
 type client struct {
-       cnxPool       internal.ConnectionPool
-       rpcClient     internal.RPCClient
-       handlers      internal.ClientHandlers
-       lookupService internal.LookupService
-       metrics       *internal.Metrics
-       tcClient      *transactionCoordinatorClient
-       memLimit      internal.MemoryLimitController
-       closeOnce     sync.Once
+       cnxPool          internal.ConnectionPool
+       rpcClient        internal.RPCClient
+       handlers         internal.ClientHandlers
+       lookupService    internal.LookupService
+       metrics          *internal.Metrics
+       tcClient         *transactionCoordinatorClient
+       memLimit         internal.MemoryLimitController
+       closeOnce        sync.Once
+       operationTimeout time.Duration
 
        log log.Logger
 }
@@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) {
        c := &client{
                cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, keepAliveInterval,
                        maxConnectionsPerHost, logger, metrics, 
connectionMaxIdleTime),
-               log:      logger,
-               metrics:  metrics,
-               memLimit: internal.NewMemoryLimitController(memLimitBytes, 
defaultMemoryLimitTriggerThreshold),
+               log:              logger,
+               metrics:          metrics,
+               memLimit:         
internal.NewMemoryLimitController(memLimitBytes, 
defaultMemoryLimitTriggerThreshold),
+               operationTimeout: operationTimeout,
        }
        serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 3572a522..162565b2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
 
 func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) {
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
-               pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
-               return nil, errors.New("failed to redeliver closing or closed 
consumer")
+               pc.log.WithField("state", state).Error("Failed to 
getLastMessageID for the closing or closed consumer")
+               return nil, errors.New("failed to getLastMessageID for the 
closing or closed consumer")
        }
-       req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
-       pc.eventsCh <- req
+       remainTime := pc.client.operationTimeout
+       var backoff internal.BackoffPolicy
+       if pc.options.backoffPolicy != nil {
+               backoff = pc.options.backoffPolicy
+       } else {
+               backoff = &internal.DefaultBackoff{}
+       }
+       request := func() (*trackingMessageID, error) {
+               req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
+               pc.eventsCh <- req
 
-       // wait for the request to complete
-       <-req.doneCh
-       return req.msgID, req.err
+               // wait for the request to complete
+               <-req.doneCh
+               return req.msgID, req.err
+       }
+       for {
+               msgID, err := request()
+               if err == nil {
+                       return msgID, nil
+               }
+               if remainTime <= 0 {
+                       pc.log.WithError(err).Error("Failed to 
getLastMessageID")
+                       return nil, fmt.Errorf("failed to getLastMessageID due 
to %w", err)
+               }
+               nextDelay := backoff.Next()
+               if nextDelay > remainTime {
+                       nextDelay = remainTime
+               }
+               remainTime -= nextDelay
+               pc.log.WithError(err).Errorf("Failed to get last message id 
from broker, retrying in %v...", nextDelay)
+               time.Sleep(nextDelay)
+       }
 }
 
 func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest) {
@@ -1987,16 +2013,11 @@ func (pc *partitionConsumer) hasNext() bool {
                return true
        }
 
-       for {
-               lastMsgID, err := pc.getLastMessageID()
-               if err != nil {
-                       pc.log.WithError(err).Error("Failed to get last message 
id from broker")
-                       continue
-               } else {
-                       pc.lastMessageInBroker = lastMsgID
-                       break
-               }
+       lastMsgID, err := pc.getLastMessageID()
+       if err != nil {
+               return false
        }
+       pc.lastMessageInBroker = lastMsgID
 
        return pc.hasMoreMessages()
 }
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 1c5235d4..4daa8890 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -113,6 +113,7 @@ type Reader interface {
        Next(context.Context) (Message, error)
 
        // HasNext checks if there is any message available to read from the 
current position
+       // If there is any errors, it will return false
        HasNext() bool
 
        // Close the reader and stop the broker to push more messages
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index ccf52875..78c222da 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
        "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
        "github.com/google/uuid"
+       "github.com/pkg/errors"
        "github.com/stretchr/testify/assert"
 )
 
@@ -1023,3 +1024,66 @@ func createPartitionedTopic(topic string, n int) error {
        }
        return nil
 }
+
+func TestReaderHasNextFailed(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       r, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       r.(*reader).c.consumers[0].state.Store(consumerClosing)
+       assert.False(t, r.HasNext())
+}
+
+func TestReaderHasNextRetryFailed(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:              serviceURL,
+               OperationTimeout: 2 * time.Second,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       r, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+
+       c := make(chan interface{})
+       defer close(c)
+
+       // Close the consumer events loop and assign a mock eventsCh
+       pc := r.(*reader).c.consumers[0]
+       pc.Close()
+       pc.state.Store(consumerReady)
+       pc.eventsCh = c
+
+       go func() {
+               for e := range c {
+                       req, ok := e.(*getLastMsgIDRequest)
+                       assert.True(t, ok, "unexpected event type")
+                       req.err = errors.New("expected error")
+                       close(req.doneCh)
+               }
+       }()
+       minTimer := time.NewTimer(1 * time.Second) // Timer to check if 
r.HasNext() blocked for at least 1s
+       maxTimer := time.NewTimer(3 * time.Second) // Timer to ensure 
r.HasNext() doesn't block for more than 3s
+       done := make(chan bool)
+       go func() {
+               assert.False(t, r.HasNext())
+               done <- true
+       }()
+
+       select {
+       case <-maxTimer.C:
+               t.Fatal("r.HasNext() blocked for more than 3s")
+       case <-done:
+               assert.False(t, minTimer.Stop(), "r.HasNext() did not block for 
at least 1s")
+               assert.True(t, maxTimer.Stop())
+       }
+
+}

Reply via email to