This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 92c6e284 improve: use ctx and timer instead sleep (#1256)
92c6e284 is described below
commit 92c6e284cfcd1eb921116da6482b79e5500e03ae
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Nov 15 13:13:23 2024 +0800
improve: use ctx and timer instead sleep (#1256)
* improve: use ctx and timer instead sleep
* Address comment
---
pulsar/consumer_partition.go | 79 +++++++++++++++-----------------
pulsar/consumer_test.go | 2 +-
pulsar/dlq_router.go | 23 ++++++----
pulsar/internal/http_client.go | 37 ++++++++-------
pulsar/internal/retry.go | 62 +++++++++++++++++++++++++
pulsar/internal/retry_test.go | 58 +++++++++++++++++++++++
pulsar/internal/rpc_client.go | 20 ++++----
pulsar/producer_partition.go | 65 +++++++++++++-------------
pulsar/producer_test.go | 2 +-
pulsar/reader_test.go | 2 +-
pulsar/retry_router.go | 23 ++++++----
pulsar/transaction_coordinator_client.go | 32 ++++++-------
12 files changed, 263 insertions(+), 142 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 471d45a3..9f0057c3 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -19,6 +19,7 @@ package pulsar
import (
"container/list"
+ "context"
"encoding/hex"
"fmt"
"math"
@@ -612,7 +613,6 @@ func (pc *partitionConsumer) getLastMessageID()
(*trackingMessageID, error) {
pc.log.WithField("state", state).Error("Failed to
getLastMessageID for the closing or closed consumer")
return nil, errors.New("failed to getLastMessageID for the
closing or closed consumer")
}
- remainTime := pc.client.operationTimeout
bo := pc.backoffPolicyFunc()
request := func() (*trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
@@ -622,23 +622,20 @@ func (pc *partitionConsumer) getLastMessageID()
(*trackingMessageID, error) {
<-req.doneCh
return req.msgID, req.err
}
- for {
- msgID, err := request()
- if err == nil {
- return msgID, nil
- }
- if remainTime <= 0 {
- pc.log.WithError(err).Error("Failed to
getLastMessageID")
- return nil, fmt.Errorf("failed to getLastMessageID due
to %w", err)
- }
+
+ ctx, cancel := context.WithTimeout(context.Background(),
pc.client.operationTimeout)
+ defer cancel()
+ res, err := internal.Retry(ctx, request, func(err error) time.Duration {
nextDelay := bo.Next()
- if nextDelay > remainTime {
- nextDelay = remainTime
- }
- remainTime -= nextDelay
pc.log.WithError(err).Errorf("Failed to get last message id
from broker, retrying in %v...", nextDelay)
- time.Sleep(nextDelay)
+ return nextDelay
+ })
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to getLastMessageID")
+ return nil, fmt.Errorf("failed to getLastMessageID due to %w",
err)
}
+
+ return res, nil
}
func (pc *partitionConsumer) internalGetLastMessageID(req
*getLastMsgIDRequest) {
@@ -1805,8 +1802,7 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
pc.log.Debug("seek operation triggers reconnection, and reset
isSeeking")
}
var (
- maxRetry int
- delayReconnectTime, totalDelayReconnectTime time.Duration
+ maxRetry int
)
if pc.options.maxReconnectToBroker == nil {
@@ -1816,50 +1812,39 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
}
bo := pc.backoffPolicyFunc()
- for maxRetry != 0 {
- if pc.getConsumerState() != consumerReady {
- // Consumer is already closing
- pc.log.Info("consumer state not ready, exit reconnect")
- return
- }
-
- var assignedBrokerURL string
+ var assignedBrokerURL string
+ if connectionClosed != nil && connectionClosed.HasURL() {
+ assignedBrokerURL = connectionClosed.assignedBrokerURL
+ }
- if connectionClosed != nil && connectionClosed.HasURL() {
- delayReconnectTime = 0
- assignedBrokerURL = connectionClosed.assignedBrokerURL
- connectionClosed = nil // Attempt connecting to the
assigned broker just once
- } else {
- delayReconnectTime = bo.Next()
+ opFn := func() (struct{}, error) {
+ if maxRetry == 0 {
+ return struct{}{}, nil
}
- totalDelayReconnectTime += delayReconnectTime
-
- pc.log.WithFields(log.Fields{
- "assignedBrokerURL": assignedBrokerURL,
- "delayReconnectTime": delayReconnectTime,
- }).Info("Reconnecting to broker")
- time.Sleep(delayReconnectTime)
- // double check
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
- return
+ return struct{}{}, nil
}
err := pc.grabConn(assignedBrokerURL)
+ if assignedBrokerURL != "" {
+ // Attempt connecting to the assigned broker just once
+ assignedBrokerURL = ""
+ }
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
bo.Reset()
- return
+ return struct{}{}, nil
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
pc.log.Warn("Topic Not Found.")
- break
+ return struct{}{}, nil
}
if maxRetry > 0 {
@@ -1869,7 +1854,17 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
+
+ return struct{}{}, err
}
+ _, _ = internal.Retry(context.Background(), opFn, func(_ error)
time.Duration {
+ delayReconnectTime := bo.Next()
+ pc.log.WithFields(log.Fields{
+ "assignedBrokerURL": assignedBrokerURL,
+ "delayReconnectTime": delayReconnectTime,
+ }).Info("Reconnecting to broker")
+ return delayReconnectTime
+ })
}
func (pc *partitionConsumer) lookupTopic(brokerServiceURL string)
(*internal.LookupResult, error) {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 2524f681..83521dd4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -129,7 +129,7 @@ func TestConsumerConnectError(t *testing.T) {
assert.Nil(t, consumer)
assert.NotNil(t, err)
- assert.Equal(t, err.Error(), "connection error")
+ assert.ErrorContains(t, err, "connection error")
}
func TestBatchMessageReceive(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 00c7e03b..7d908ff6 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -22,6 +22,8 @@ import (
"fmt"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+
"github.com/apache/pulsar-client-go/pulsar/backoff"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -165,7 +167,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
// Retry to create producer indefinitely
bo := r.backOffPolicyFunc()
- for {
+ opFn := func() (Producer, error) {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
@@ -179,14 +181,17 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
if r.policy.ProducerOptions.CompressionType == NoCompression {
opt.CompressionType = LZ4
}
- producer, err := r.client.CreateProducer(opt)
+ return r.client.CreateProducer(opt)
+ }
- if err != nil {
- r.log.WithError(err).Error("Failed to create DLQ
producer")
- time.Sleep(bo.Next())
- continue
- }
- r.producer = producer
- return producer
+ res, err := internal.Retry(context.Background(), opFn, func(err error)
time.Duration {
+ r.log.WithError(err).Error("Failed to create DLQ producer")
+ return bo.Next()
+ })
+
+ if err == nil {
+ r.producer = res
}
+
+ return res
}
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index 632d5a4f..b7dea1fe 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -19,6 +19,7 @@ package internal
import (
"bytes"
+ "context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@@ -146,25 +147,27 @@ func (c *httpClient) MakeRequest(method, endpoint string)
(*http.Response, error
}
func (c *httpClient) Get(endpoint string, obj interface{}, params
map[string]string) error {
- _, err := c.GetWithQueryParams(endpoint, obj, params, true)
- if _, ok := err.(*url.Error); ok {
- // We can retry this kind of requests over a connection error
because they're
- // not specific to a particular broker.
- bo := backoff.NewDefaultBackoffWithInitialBackOff(100 *
time.Millisecond)
- startTime := time.Now()
- var retryTime time.Duration
-
- for time.Since(startTime) < c.requestTimeout {
- retryTime = bo.Next()
- c.log.Debugf("Retrying httpRequest in {%v} with timeout
in {%v}", retryTime, c.requestTimeout)
- time.Sleep(retryTime)
- _, err = c.GetWithQueryParams(endpoint, obj, params,
true)
- if _, ok := err.(*url.Error); !ok {
- // We either succeeded or encountered a non
connection error
- break
- }
+ var err error
+ opFn := func() (struct{}, error) {
+ _, err = c.GetWithQueryParams(endpoint, obj, params, true)
+ if _, ok := err.(*url.Error); ok {
+ // We can retry this kind of requests over a connection
error because they're
+ // not specific to a particular broker.
+ return struct{}{}, err
}
+ return struct{}{}, nil
}
+
+ bo := backoff.NewDefaultBackoffWithInitialBackOff(100 *
time.Millisecond)
+
+ ctx, cancel := context.WithTimeout(context.Background(),
c.requestTimeout)
+ defer cancel()
+
+ _, _ = Retry(ctx, opFn, func(_ error) time.Duration {
+ retryTime := bo.Next()
+ c.log.Debugf("Retrying httpRequest in {%v} with timeout in
{%v}", retryTime, c.requestTimeout)
+ return retryTime
+ })
return err
}
diff --git a/pulsar/internal/retry.go b/pulsar/internal/retry.go
new file mode 100644
index 00000000..2f7bf658
--- /dev/null
+++ b/pulsar/internal/retry.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "context"
+ "errors"
+ "time"
+)
+
+type OpFn[T any] func() (T, error)
+
+// Retry the given operation until the returned error is nil or the context is
done.
+func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error)
time.Duration) (T, error) {
+ var (
+ timer *time.Timer
+ res T
+ err error
+ )
+
+ cleanTimer := func() {
+ if timer != nil {
+ timer.Stop()
+ }
+ }
+ defer cleanTimer()
+
+ for {
+ res, err = op()
+ if err == nil {
+ return res, nil
+ }
+
+ duration := nextDuration(err)
+ if timer == nil {
+ timer = time.NewTimer(duration)
+ } else {
+ timer.Reset(duration)
+ }
+
+ select {
+ case <-ctx.Done():
+ return res, errors.Join(ctx.Err(), err)
+ case <-timer.C:
+ }
+ }
+}
diff --git a/pulsar/internal/retry_test.go b/pulsar/internal/retry_test.go
new file mode 100644
index 00000000..ab9f4dc6
--- /dev/null
+++ b/pulsar/internal/retry_test.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestRetryWithCtxBackground(t *testing.T) {
+ ctx := context.Background()
+ i := 0
+ res, err := Retry(ctx, func() (string, error) {
+ if i == 2 {
+ return "ok", nil
+ }
+ i++
+ return "", errors.New("error")
+ }, func(_ error) time.Duration {
+ return 1 * time.Second
+ })
+ require.NoError(t, err)
+ require.Equal(t, "ok", res)
+}
+
+func TestRetryWithCtxTimeout(t *testing.T) {
+ ctx, cancelFn := context.WithTimeout(context.Background(),
2*time.Second)
+ defer cancelFn()
+ retryErr := errors.New("error")
+ res, err := Retry(ctx, func() (string, error) {
+ return "", retryErr
+ }, func(err error) time.Duration {
+ require.Equal(t, retryErr, err)
+ return 1 * time.Second
+ })
+ require.ErrorIs(t, err, context.DeadlineExceeded)
+ require.ErrorContains(t, err, retryErr.Error())
+ require.Equal(t, "", res)
+}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0593203a..9a88a167 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -18,6 +18,7 @@
package internal
import (
+ "context"
"errors"
"fmt"
"net/url"
@@ -117,27 +118,26 @@ func (c *rpcClient) requestToHost(serviceNameResolver
*ServiceNameResolver,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message)
(*RPCResult, error) {
var err error
var host *url.URL
- var rpcResult *RPCResult
- startTime := time.Now()
bo := backoff.NewDefaultBackoffWithInitialBackOff(100 *
time.Millisecond)
// we can retry these requests because this kind of request is
// not specific to any particular broker
- for time.Since(startTime) < c.requestTimeout {
+ opFn := func() (*RPCResult, error) {
host, err = (*serviceNameResolver).ResolveHost()
if err != nil {
c.log.WithError(err).Errorf("rpc client failed to
resolve host")
return nil, err
}
- rpcResult, err = c.Request(host, host, requestID, cmdType,
message)
- // success we got a response
- if err == nil {
- break
- }
+ return c.Request(host, host, requestID, cmdType, message)
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(),
c.requestTimeout)
+ defer cancel()
+ rpcResult, err := Retry(ctx, opFn, func(_ error) time.Duration {
retryTime := bo.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}",
retryTime, c.requestTimeout)
- time.Sleep(retryTime)
- }
+ return retryTime
+ })
return rpcResult, err
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b1fc3f02..c5ae259a 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -471,8 +471,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo
*SchemaInfo) (schemaVer
func (p *partitionProducer) reconnectToBroker(connectionClosed
*connectionClosed) {
var (
- maxRetry int
- delayReconnectTime time.Duration
+ maxRetry int
)
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
@@ -482,49 +481,39 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
bo := p.backOffPolicyFunc()
- for maxRetry != 0 {
- select {
- case <-p.ctx.Done():
- return
- default:
- }
+ var assignedBrokerURL string
+ if connectionClosed != nil && connectionClosed.HasURL() {
+ assignedBrokerURL = connectionClosed.assignedBrokerURL
+ }
- if p.getProducerState() != producerReady {
- // Producer is already closing
- p.log.Info("producer state not ready, exit reconnect")
- return
+ opFn := func() (struct{}, error) {
+ if maxRetry == 0 {
+ return struct{}{}, nil
}
- var assignedBrokerURL string
-
- if connectionClosed != nil && connectionClosed.HasURL() {
- delayReconnectTime = 0
- assignedBrokerURL = connectionClosed.assignedBrokerURL
- connectionClosed = nil // Only attempt once
- } else {
- delayReconnectTime = bo.Next()
+ select {
+ case <-p.ctx.Done():
+ return struct{}{}, nil
+ default:
}
- p.log.WithFields(log.Fields{
- "assignedBrokerURL": assignedBrokerURL,
- "delayReconnectTime": delayReconnectTime,
- }).Info("Reconnecting to broker")
- time.Sleep(delayReconnectTime)
-
- // double check
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
- return
+ return struct{}{}, nil
}
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx(assignedBrokerURL)
+ if assignedBrokerURL != "" {
+ // Only attempt once
+ assignedBrokerURL = ""
+ }
if err == nil {
// Successfully reconnected
p.log.WithField("cnx",
p._getConn().ID()).Info("Reconnected producer to broker")
bo.Reset()
- return
+ return struct{}{}, nil
}
p.log.WithError(err).Error("Failed to create producer at
reconnect")
errMsg := err.Error()
@@ -532,25 +521,25 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
// when topic is deleted, we should give up
reconnection.
p.log.Warn("Topic not found, stop reconnecting, close
the producer")
p.doClose(joinErrors(ErrTopicNotfound, err))
- break
+ return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending
messages, stop reconnecting, close the producer")
p.doClose(joinErrors(ErrTopicTerminated, err))
- break
+ return struct{}{}, nil
}
if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
- break
+ return struct{}{}, nil
}
if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending
messages, stop reconnecting")
p.doClose(joinErrors(ErrProducerFenced, err))
- break
+ return struct{}{}, nil
}
if maxRetry > 0 {
@@ -560,7 +549,17 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
if maxRetry == 0 || bo.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
+
+ return struct{}{}, err
}
+ _, _ = internal.Retry(context.Background(), opFn, func(_ error)
time.Duration {
+ delayReconnectTime := bo.Next()
+ p.log.WithFields(log.Fields{
+ "assignedBrokerURL": assignedBrokerURL,
+ "delayReconnectTime": delayReconnectTime,
+ }).Info("Reconnecting to broker")
+ return delayReconnectTime
+ })
}
func (p *partitionProducer) runEventsLoop() {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index b58c0608..24939a82 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -73,7 +73,7 @@ func TestProducerConnectError(t *testing.T) {
assert.Nil(t, producer)
assert.NotNil(t, err)
- assert.Equal(t, err.Error(), "connection error")
+ assert.ErrorContains(t, err, "connection error")
}
func TestProducerNoTopic(t *testing.T) {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index d2fa5597..83653570 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -221,7 +221,7 @@ func TestReaderConnectError(t *testing.T) {
assert.Nil(t, reader)
assert.NotNil(t, err)
- assert.Equal(t, err.Error(), "connection error")
+ assert.ErrorContains(t, err, "connection error")
}
func TestReaderOnSpecificMessage(t *testing.T) {
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index a9a67adb..198371a3 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -21,6 +21,8 @@ import (
"context"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+
"github.com/apache/pulsar-client-go/pulsar/backoff"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -135,7 +137,7 @@ func (r *retryRouter) getProducer() Producer {
// Retry to create producer indefinitely
bo := r.backOffPolicyFunc()
- for {
+ opFn := func() (Producer, error) {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.RetryLetterTopic
// the origin code sets to LZ4 compression with no options
@@ -144,14 +146,15 @@ func (r *retryRouter) getProducer() Producer {
opt.CompressionType = LZ4
}
- producer, err := r.client.CreateProducer(opt)
-
- if err != nil {
- r.log.WithError(err).Error("Failed to create RLQ
producer")
- time.Sleep(bo.Next())
- continue
- }
- r.producer = producer
- return producer
+ return r.client.CreateProducer(opt)
+ }
+ res, err := internal.Retry(context.Background(), opFn, func(err error)
time.Duration {
+ r.log.WithError(err).Error("Failed to create RLQ producer")
+ return bo.Next()
+ })
+ if err == nil {
+ r.producer = res
}
+
+ return res
}
diff --git a/pulsar/transaction_coordinator_client.go
b/pulsar/transaction_coordinator_client.go
index afde5427..e66b06ef 100644
--- a/pulsar/transaction_coordinator_client.go
+++ b/pulsar/transaction_coordinator_client.go
@@ -147,41 +147,37 @@ func (t *transactionHandler) reconnectToBroker() {
var delayReconnectTime time.Duration
var defaultBackoff = backoff.DefaultBackoff{}
- for {
+ opFn := func() (struct{}, error) {
if t.getState() == txnHandlerClosed {
// The handler is already closing
t.log.Info("transaction handler is closed, exit
reconnect")
- return
- }
-
- delayReconnectTime = defaultBackoff.Next()
-
- t.log.WithFields(log.Fields{
- "delayReconnectTime": delayReconnectTime,
- }).Info("Transaction handler will reconnect to the transaction
coordinator")
- time.Sleep(delayReconnectTime)
-
- // double check
- if t.getState() == txnHandlerClosed {
- // Txn handler is already closing
- t.log.Info("transaction handler is closed, exit
reconnect")
- return
+ return struct{}{}, nil
}
err := t.grabConn()
if err == nil {
// Successfully reconnected
t.log.Info("Reconnected transaction handler to broker")
- return
+ return struct{}{}, nil
}
+
t.log.WithError(err).Error("Failed to create transaction
handler at reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
t.log.Warn("Topic Not Found")
- break
+ return struct{}{}, nil
}
+ return struct{}{}, err
}
+
+ _, _ = internal.Retry(context.Background(), opFn, func(_ error)
time.Duration {
+ delayReconnectTime = defaultBackoff.Next()
+ t.log.WithFields(log.Fields{
+ "delayReconnectTime": delayReconnectTime,
+ }).Info("Transaction handler will reconnect to the transaction
coordinator")
+ return delayReconnectTime
+ })
}
func (t *transactionHandler) checkRetriableError(err error, op any) bool {