This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 92c6e284 improve: use ctx and timer instead sleep (#1256)
92c6e284 is described below

commit 92c6e284cfcd1eb921116da6482b79e5500e03ae
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Nov 15 13:13:23 2024 +0800

    improve: use ctx and timer instead sleep (#1256)
    
    * improve: use ctx and timer instead sleep
    
    * Address comment
---
 pulsar/consumer_partition.go             | 79 +++++++++++++++-----------------
 pulsar/consumer_test.go                  |  2 +-
 pulsar/dlq_router.go                     | 23 ++++++----
 pulsar/internal/http_client.go           | 37 ++++++++-------
 pulsar/internal/retry.go                 | 62 +++++++++++++++++++++++++
 pulsar/internal/retry_test.go            | 58 +++++++++++++++++++++++
 pulsar/internal/rpc_client.go            | 20 ++++----
 pulsar/producer_partition.go             | 65 +++++++++++++-------------
 pulsar/producer_test.go                  |  2 +-
 pulsar/reader_test.go                    |  2 +-
 pulsar/retry_router.go                   | 23 ++++++----
 pulsar/transaction_coordinator_client.go | 32 ++++++-------
 12 files changed, 263 insertions(+), 142 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 471d45a3..9f0057c3 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
        "container/list"
+       "context"
        "encoding/hex"
        "fmt"
        "math"
@@ -612,7 +613,6 @@ func (pc *partitionConsumer) getLastMessageID() 
(*trackingMessageID, error) {
                pc.log.WithField("state", state).Error("Failed to 
getLastMessageID for the closing or closed consumer")
                return nil, errors.New("failed to getLastMessageID for the 
closing or closed consumer")
        }
-       remainTime := pc.client.operationTimeout
        bo := pc.backoffPolicyFunc()
        request := func() (*trackingMessageID, error) {
                req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
@@ -622,23 +622,20 @@ func (pc *partitionConsumer) getLastMessageID() 
(*trackingMessageID, error) {
                <-req.doneCh
                return req.msgID, req.err
        }
-       for {
-               msgID, err := request()
-               if err == nil {
-                       return msgID, nil
-               }
-               if remainTime <= 0 {
-                       pc.log.WithError(err).Error("Failed to 
getLastMessageID")
-                       return nil, fmt.Errorf("failed to getLastMessageID due 
to %w", err)
-               }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
pc.client.operationTimeout)
+       defer cancel()
+       res, err := internal.Retry(ctx, request, func(err error) time.Duration {
                nextDelay := bo.Next()
-               if nextDelay > remainTime {
-                       nextDelay = remainTime
-               }
-               remainTime -= nextDelay
                pc.log.WithError(err).Errorf("Failed to get last message id 
from broker, retrying in %v...", nextDelay)
-               time.Sleep(nextDelay)
+               return nextDelay
+       })
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to getLastMessageID")
+               return nil, fmt.Errorf("failed to getLastMessageID due to %w", 
err)
        }
+
+       return res, nil
 }
 
 func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest) {
@@ -1805,8 +1802,7 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                pc.log.Debug("seek operation triggers reconnection, and reset 
isSeeking")
        }
        var (
-               maxRetry                                    int
-               delayReconnectTime, totalDelayReconnectTime time.Duration
+               maxRetry int
        )
 
        if pc.options.maxReconnectToBroker == nil {
@@ -1816,50 +1812,39 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
        }
        bo := pc.backoffPolicyFunc()
 
-       for maxRetry != 0 {
-               if pc.getConsumerState() != consumerReady {
-                       // Consumer is already closing
-                       pc.log.Info("consumer state not ready, exit reconnect")
-                       return
-               }
-
-               var assignedBrokerURL string
+       var assignedBrokerURL string
+       if connectionClosed != nil && connectionClosed.HasURL() {
+               assignedBrokerURL = connectionClosed.assignedBrokerURL
+       }
 
-               if connectionClosed != nil && connectionClosed.HasURL() {
-                       delayReconnectTime = 0
-                       assignedBrokerURL = connectionClosed.assignedBrokerURL
-                       connectionClosed = nil // Attempt connecting to the 
assigned broker just once
-               } else {
-                       delayReconnectTime = bo.Next()
+       opFn := func() (struct{}, error) {
+               if maxRetry == 0 {
+                       return struct{}{}, nil
                }
-               totalDelayReconnectTime += delayReconnectTime
-
-               pc.log.WithFields(log.Fields{
-                       "assignedBrokerURL":  assignedBrokerURL,
-                       "delayReconnectTime": delayReconnectTime,
-               }).Info("Reconnecting to broker")
-               time.Sleep(delayReconnectTime)
 
-               // double check
                if pc.getConsumerState() != consumerReady {
                        // Consumer is already closing
                        pc.log.Info("consumer state not ready, exit reconnect")
-                       return
+                       return struct{}{}, nil
                }
 
                err := pc.grabConn(assignedBrokerURL)
+               if assignedBrokerURL != "" {
+                       // Attempt connecting to the assigned broker just once
+                       assignedBrokerURL = ""
+               }
                if err == nil {
                        // Successfully reconnected
                        pc.log.Info("Reconnected consumer to broker")
                        bo.Reset()
-                       return
+                       return struct{}{}, nil
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
                errMsg := err.Error()
                if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
                        pc.log.Warn("Topic Not Found.")
-                       break
+                       return struct{}{}, nil
                }
 
                if maxRetry > 0 {
@@ -1869,7 +1854,17 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
                }
+
+               return struct{}{}, err
        }
+       _, _ = internal.Retry(context.Background(), opFn, func(_ error) 
time.Duration {
+               delayReconnectTime := bo.Next()
+               pc.log.WithFields(log.Fields{
+                       "assignedBrokerURL":  assignedBrokerURL,
+                       "delayReconnectTime": delayReconnectTime,
+               }).Info("Reconnecting to broker")
+               return delayReconnectTime
+       })
 }
 
 func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) 
(*internal.LookupResult, error) {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2524f681..83521dd4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -129,7 +129,7 @@ func TestConsumerConnectError(t *testing.T) {
        assert.Nil(t, consumer)
        assert.NotNil(t, err)
 
-       assert.Equal(t, err.Error(), "connection error")
+       assert.ErrorContains(t, err, "connection error")
 }
 
 func TestBatchMessageReceive(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 00c7e03b..7d908ff6 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -22,6 +22,8 @@ import (
        "fmt"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+
        "github.com/apache/pulsar-client-go/pulsar/backoff"
 
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -165,7 +167,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
 
        // Retry to create producer indefinitely
        bo := r.backOffPolicyFunc()
-       for {
+       opFn := func() (Producer, error) {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.DeadLetterTopic
                opt.Schema = schema
@@ -179,14 +181,17 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
                if r.policy.ProducerOptions.CompressionType == NoCompression {
                        opt.CompressionType = LZ4
                }
-               producer, err := r.client.CreateProducer(opt)
+               return r.client.CreateProducer(opt)
+       }
 
-               if err != nil {
-                       r.log.WithError(err).Error("Failed to create DLQ 
producer")
-                       time.Sleep(bo.Next())
-                       continue
-               }
-               r.producer = producer
-               return producer
+       res, err := internal.Retry(context.Background(), opFn, func(err error) 
time.Duration {
+               r.log.WithError(err).Error("Failed to create DLQ producer")
+               return bo.Next()
+       })
+
+       if err == nil {
+               r.producer = res
        }
+
+       return res
 }
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index 632d5a4f..b7dea1fe 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -19,6 +19,7 @@ package internal
 
 import (
        "bytes"
+       "context"
        "crypto/tls"
        "crypto/x509"
        "encoding/json"
@@ -146,25 +147,27 @@ func (c *httpClient) MakeRequest(method, endpoint string) 
(*http.Response, error
 }
 
 func (c *httpClient) Get(endpoint string, obj interface{}, params 
map[string]string) error {
-       _, err := c.GetWithQueryParams(endpoint, obj, params, true)
-       if _, ok := err.(*url.Error); ok {
-               // We can retry this kind of requests over a connection error 
because they're
-               // not specific to a particular broker.
-               bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * 
time.Millisecond)
-               startTime := time.Now()
-               var retryTime time.Duration
-
-               for time.Since(startTime) < c.requestTimeout {
-                       retryTime = bo.Next()
-                       c.log.Debugf("Retrying httpRequest in {%v} with timeout 
in {%v}", retryTime, c.requestTimeout)
-                       time.Sleep(retryTime)
-                       _, err = c.GetWithQueryParams(endpoint, obj, params, 
true)
-                       if _, ok := err.(*url.Error); !ok {
-                               // We either succeeded or encountered a non 
connection error
-                               break
-                       }
+       var err error
+       opFn := func() (struct{}, error) {
+               _, err = c.GetWithQueryParams(endpoint, obj, params, true)
+               if _, ok := err.(*url.Error); ok {
+                       // We can retry this kind of requests over a connection 
error because they're
+                       // not specific to a particular broker.
+                       return struct{}{}, err
                }
+               return struct{}{}, nil
        }
+
+       bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * 
time.Millisecond)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.requestTimeout)
+       defer cancel()
+
+       _, _ = Retry(ctx, opFn, func(_ error) time.Duration {
+               retryTime := bo.Next()
+               c.log.Debugf("Retrying httpRequest in {%v} with timeout in 
{%v}", retryTime, c.requestTimeout)
+               return retryTime
+       })
        return err
 }
 
diff --git a/pulsar/internal/retry.go b/pulsar/internal/retry.go
new file mode 100644
index 00000000..2f7bf658
--- /dev/null
+++ b/pulsar/internal/retry.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "errors"
+       "time"
+)
+
+type OpFn[T any] func() (T, error)
+
+// Retry the given operation until the returned error is nil or the context is 
done.
+func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) 
time.Duration) (T, error) {
+       var (
+               timer *time.Timer
+               res   T
+               err   error
+       )
+
+       cleanTimer := func() {
+               if timer != nil {
+                       timer.Stop()
+               }
+       }
+       defer cleanTimer()
+
+       for {
+               res, err = op()
+               if err == nil {
+                       return res, nil
+               }
+
+               duration := nextDuration(err)
+               if timer == nil {
+                       timer = time.NewTimer(duration)
+               } else {
+                       timer.Reset(duration)
+               }
+
+               select {
+               case <-ctx.Done():
+                       return res, errors.Join(ctx.Err(), err)
+               case <-timer.C:
+               }
+       }
+}
diff --git a/pulsar/internal/retry_test.go b/pulsar/internal/retry_test.go
new file mode 100644
index 00000000..ab9f4dc6
--- /dev/null
+++ b/pulsar/internal/retry_test.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "errors"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+)
+
+func TestRetryWithCtxBackground(t *testing.T) {
+       ctx := context.Background()
+       i := 0
+       res, err := Retry(ctx, func() (string, error) {
+               if i == 2 {
+                       return "ok", nil
+               }
+               i++
+               return "", errors.New("error")
+       }, func(_ error) time.Duration {
+               return 1 * time.Second
+       })
+       require.NoError(t, err)
+       require.Equal(t, "ok", res)
+}
+
+func TestRetryWithCtxTimeout(t *testing.T) {
+       ctx, cancelFn := context.WithTimeout(context.Background(), 
2*time.Second)
+       defer cancelFn()
+       retryErr := errors.New("error")
+       res, err := Retry(ctx, func() (string, error) {
+               return "", retryErr
+       }, func(err error) time.Duration {
+               require.Equal(t, retryErr, err)
+               return 1 * time.Second
+       })
+       require.ErrorIs(t, err, context.DeadlineExceeded)
+       require.ErrorContains(t, err, retryErr.Error())
+       require.Equal(t, "", res)
+}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0593203a..9a88a167 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+       "context"
        "errors"
        "fmt"
        "net/url"
@@ -117,27 +118,26 @@ func (c *rpcClient) requestToHost(serviceNameResolver 
*ServiceNameResolver,
        requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) 
(*RPCResult, error) {
        var err error
        var host *url.URL
-       var rpcResult *RPCResult
-       startTime := time.Now()
        bo := backoff.NewDefaultBackoffWithInitialBackOff(100 * 
time.Millisecond)
        // we can retry these requests because this kind of request is
        // not specific to any particular broker
-       for time.Since(startTime) < c.requestTimeout {
+       opFn := func() (*RPCResult, error) {
                host, err = (*serviceNameResolver).ResolveHost()
                if err != nil {
                        c.log.WithError(err).Errorf("rpc client failed to 
resolve host")
                        return nil, err
                }
-               rpcResult, err = c.Request(host, host, requestID, cmdType, 
message)
-               // success we got a response
-               if err == nil {
-                       break
-               }
+               return c.Request(host, host, requestID, cmdType, message)
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
c.requestTimeout)
+       defer cancel()
 
+       rpcResult, err := Retry(ctx, opFn, func(_ error) time.Duration {
                retryTime := bo.Next()
                c.log.Debugf("Retrying request in {%v} with timeout in {%v}", 
retryTime, c.requestTimeout)
-               time.Sleep(retryTime)
-       }
+               return retryTime
+       })
 
        return rpcResult, err
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b1fc3f02..c5ae259a 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -471,8 +471,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo 
*SchemaInfo) (schemaVer
 
 func (p *partitionProducer) reconnectToBroker(connectionClosed 
*connectionClosed) {
        var (
-               maxRetry           int
-               delayReconnectTime time.Duration
+               maxRetry int
        )
        if p.options.MaxReconnectToBroker == nil {
                maxRetry = -1
@@ -482,49 +481,39 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
 
        bo := p.backOffPolicyFunc()
 
-       for maxRetry != 0 {
-               select {
-               case <-p.ctx.Done():
-                       return
-               default:
-               }
+       var assignedBrokerURL string
+       if connectionClosed != nil && connectionClosed.HasURL() {
+               assignedBrokerURL = connectionClosed.assignedBrokerURL
+       }
 
-               if p.getProducerState() != producerReady {
-                       // Producer is already closing
-                       p.log.Info("producer state not ready, exit reconnect")
-                       return
+       opFn := func() (struct{}, error) {
+               if maxRetry == 0 {
+                       return struct{}{}, nil
                }
 
-               var assignedBrokerURL string
-
-               if connectionClosed != nil && connectionClosed.HasURL() {
-                       delayReconnectTime = 0
-                       assignedBrokerURL = connectionClosed.assignedBrokerURL
-                       connectionClosed = nil // Only attempt once
-               } else {
-                       delayReconnectTime = bo.Next()
+               select {
+               case <-p.ctx.Done():
+                       return struct{}{}, nil
+               default:
                }
 
-               p.log.WithFields(log.Fields{
-                       "assignedBrokerURL":  assignedBrokerURL,
-                       "delayReconnectTime": delayReconnectTime,
-               }).Info("Reconnecting to broker")
-               time.Sleep(delayReconnectTime)
-
-               // double check
                if p.getProducerState() != producerReady {
                        // Producer is already closing
                        p.log.Info("producer state not ready, exit reconnect")
-                       return
+                       return struct{}{}, nil
                }
 
                atomic.AddUint64(&p.epoch, 1)
                err := p.grabCnx(assignedBrokerURL)
+               if assignedBrokerURL != "" {
+                       // Only attempt once
+                       assignedBrokerURL = ""
+               }
                if err == nil {
                        // Successfully reconnected
                        p.log.WithField("cnx", 
p._getConn().ID()).Info("Reconnected producer to broker")
                        bo.Reset()
-                       return
+                       return struct{}{}, nil
                }
                p.log.WithError(err).Error("Failed to create producer at 
reconnect")
                errMsg := err.Error()
@@ -532,25 +521,25 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                        // when topic is deleted, we should give up 
reconnection.
                        p.log.Warn("Topic not found, stop reconnecting, close 
the producer")
                        p.doClose(joinErrors(ErrTopicNotfound, err))
-                       break
+                       return struct{}{}, nil
                }
 
                if strings.Contains(errMsg, errMsgTopicTerminated) {
                        p.log.Warn("Topic was terminated, failing pending 
messages, stop reconnecting, close the producer")
                        p.doClose(joinErrors(ErrTopicTerminated, err))
-                       break
+                       return struct{}{}, nil
                }
 
                if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
                        p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")
                        
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
-                       break
+                       return struct{}{}, nil
                }
 
                if strings.Contains(errMsg, errMsgProducerFenced) {
                        p.log.Warn("Producer was fenced, failing pending 
messages, stop reconnecting")
                        p.doClose(joinErrors(ErrProducerFenced, err))
-                       break
+                       return struct{}{}, nil
                }
 
                if maxRetry > 0 {
@@ -560,7 +549,17 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        p.metrics.ProducersReconnectMaxRetry.Inc()
                }
+
+               return struct{}{}, err
        }
+       _, _ = internal.Retry(context.Background(), opFn, func(_ error) 
time.Duration {
+               delayReconnectTime := bo.Next()
+               p.log.WithFields(log.Fields{
+                       "assignedBrokerURL":  assignedBrokerURL,
+                       "delayReconnectTime": delayReconnectTime,
+               }).Info("Reconnecting to broker")
+               return delayReconnectTime
+       })
 }
 
 func (p *partitionProducer) runEventsLoop() {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index b58c0608..24939a82 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -73,7 +73,7 @@ func TestProducerConnectError(t *testing.T) {
        assert.Nil(t, producer)
        assert.NotNil(t, err)
 
-       assert.Equal(t, err.Error(), "connection error")
+       assert.ErrorContains(t, err, "connection error")
 }
 
 func TestProducerNoTopic(t *testing.T) {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d2fa5597..83653570 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -221,7 +221,7 @@ func TestReaderConnectError(t *testing.T) {
        assert.Nil(t, reader)
        assert.NotNil(t, err)
 
-       assert.Equal(t, err.Error(), "connection error")
+       assert.ErrorContains(t, err, "connection error")
 }
 
 func TestReaderOnSpecificMessage(t *testing.T) {
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index a9a67adb..198371a3 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -21,6 +21,8 @@ import (
        "context"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+
        "github.com/apache/pulsar-client-go/pulsar/backoff"
 
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -135,7 +137,7 @@ func (r *retryRouter) getProducer() Producer {
 
        // Retry to create producer indefinitely
        bo := r.backOffPolicyFunc()
-       for {
+       opFn := func() (Producer, error) {
                opt := r.policy.ProducerOptions
                opt.Topic = r.policy.RetryLetterTopic
                // the origin code sets to LZ4 compression with no options
@@ -144,14 +146,15 @@ func (r *retryRouter) getProducer() Producer {
                        opt.CompressionType = LZ4
                }
 
-               producer, err := r.client.CreateProducer(opt)
-
-               if err != nil {
-                       r.log.WithError(err).Error("Failed to create RLQ 
producer")
-                       time.Sleep(bo.Next())
-                       continue
-               }
-               r.producer = producer
-               return producer
+               return r.client.CreateProducer(opt)
+       }
+       res, err := internal.Retry(context.Background(), opFn, func(err error) 
time.Duration {
+               r.log.WithError(err).Error("Failed to create RLQ producer")
+               return bo.Next()
+       })
+       if err == nil {
+               r.producer = res
        }
+
+       return res
 }
diff --git a/pulsar/transaction_coordinator_client.go 
b/pulsar/transaction_coordinator_client.go
index afde5427..e66b06ef 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -147,41 +147,37 @@ func (t *transactionHandler) reconnectToBroker() {
        var delayReconnectTime time.Duration
        var defaultBackoff = backoff.DefaultBackoff{}
 
-       for {
+       opFn := func() (struct{}, error) {
                if t.getState() == txnHandlerClosed {
                        // The handler is already closing
                        t.log.Info("transaction handler is closed, exit 
reconnect")
-                       return
-               }
-
-               delayReconnectTime = defaultBackoff.Next()
-
-               t.log.WithFields(log.Fields{
-                       "delayReconnectTime": delayReconnectTime,
-               }).Info("Transaction handler will reconnect to the transaction 
coordinator")
-               time.Sleep(delayReconnectTime)
-
-               // double check
-               if t.getState() == txnHandlerClosed {
-                       // Txn handler is already closing
-                       t.log.Info("transaction handler is closed, exit 
reconnect")
-                       return
+                       return struct{}{}, nil
                }
 
                err := t.grabConn()
                if err == nil {
                        // Successfully reconnected
                        t.log.Info("Reconnected transaction handler to broker")
-                       return
+                       return struct{}{}, nil
                }
+
                t.log.WithError(err).Error("Failed to create transaction 
handler at reconnect")
                errMsg := err.Error()
                if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
                        t.log.Warn("Topic Not Found")
-                       break
+                       return struct{}{}, nil
                }
+               return struct{}{}, err
        }
+
+       _, _ = internal.Retry(context.Background(), opFn, func(_ error) 
time.Duration {
+               delayReconnectTime = defaultBackoff.Next()
+               t.log.WithFields(log.Fields{
+                       "delayReconnectTime": delayReconnectTime,
+               }).Info("Transaction handler will reconnect to the transaction 
coordinator")
+               return delayReconnectTime
+       })
 }
 
 func (t *transactionHandler) checkRetriableError(err error, op any) bool {

Reply via email to