Copilot commented on code in PR #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490#discussion_r3209532191


##########
pulsar/consumer_partition.go:
##########
@@ -2100,6 +2102,12 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                pc.metrics.ConsumersReconnectFailure.Inc()
                if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
+                       if pc.options.maxReconnectToBrokerListener != nil {
+                               
pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err)
+                       }

Review Comment:
   When `bo.IsMaxBackoffReached()` becomes true, this branch will execute on 
every subsequent reconnect failure (since the retry loop continues returning 
`err`). That means `MaxReconnectToBrokerListener` can be invoked multiple times 
and (if enabled) repeated close goroutines can be spawned. If the intent is to 
treat max-backoff as an exhaustion condition, consider making it a terminal 
condition (e.g., stop retrying by returning `nil`/forcing `maxRetry=0`) and/or 
guard the notification/close with a once flag so it fires only once per 
reconnect cycle.



##########
pulsar/consumer_partition.go:
##########
@@ -2100,6 +2102,12 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
                pc.metrics.ConsumersReconnectFailure.Inc()
                if maxRetry == 0 || bo.IsMaxBackoffReached() {
                        pc.metrics.ConsumersReconnectMaxRetry.Inc()
+                       if pc.options.maxReconnectToBrokerListener != nil {
+                               
pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err)
+                       }
+                       if pc.options.closeConsumerOnMaxReconnectToBroker {
+                               go pc.parentConsumer.Close()
+                       }

Review Comment:
   `go pc.parentConsumer.Close()` can be launched multiple times if the 
exhaustion condition is hit repeatedly (notably when `IsMaxBackoffReached` 
stays true). Even though `consumer.Close()` is idempotent, spawning unbounded 
goroutines is avoidable; consider gating this close trigger (e.g., with 
sync.Once/atomic) together with the listener firing.



##########
pulsar/consumer.go:
##########
@@ -222,6 +222,18 @@ type ConsumerOptions struct {
        // MaxReconnectToBroker sets the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
 
+       // MaxReconnectToBrokerListener is called when the consumer exhausts 
all reconnect attempts
+       // set by MaxReconnectToBroker. The consumer argument is the parent 
consumer, and err is the
+       // last connection error. Use this callback to detect silent failure 
and take recovery action
+       // (e.g. recreate the consumer). Only fires when MaxReconnectToBroker 
is set to a finite value
+       // or when the backoff policy signals IsMaxBackoffReached.
+       MaxReconnectToBrokerListener func(consumer Consumer, err error)
+
+       // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes 
the consumer after
+       // exhausting all reconnect attempts. The close happens asynchronously 
after
+       // MaxReconnectToBrokerListener (if set) returns. Default: false.

Review Comment:
   The GoDoc implies applications can “take recovery action (e.g. recreate the 
consumer)”. Since this listener is invoked from the partition consumer event 
loop, calling `Consumer.Close()` synchronously inside the callback can deadlock 
(partitionConsumer.Close sends a closeRequest on `eventsCh` and waits, but the 
event loop is blocked in `reconnectToBroker`). Please document that callbacks 
must not call `Close()` directly (or must do so asynchronously), and/or 
recommend using `CloseConsumerOnMaxReconnectToBroker` for safe auto-close.
   



##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)

Review Comment:
   The test starts a container but only terminates it at the end of the happy 
path. If any `require.*` assertion fails after the container is started, the 
container may be left running and affect subsequent tests. Consider registering 
`t.Cleanup`/`defer` to terminate the container immediately after it is created 
(and check the terminate error).



##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)
+
+       pulsarClient, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 3 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer pulsarClient.Close()
+
+       maxRetry := uint(1)
+       listenerFired := make(chan struct{})
+       var (
+               listenerErr      error
+               listenerConsumer Consumer
+       )
+
+       topic := newTopicName()
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+                       Topic:                topic,
+                       SubscriptionName:     "test-max-reconnect-listener",
+                       MaxReconnectToBroker: &maxRetry,
+                       BackOffPolicyFunc: func() backoff.Policy {
+                               return 
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+                       },
+                       MaxReconnectToBrokerListener: func(c Consumer, e error) 
{
+                               listenerConsumer = c
+                               listenerErr = e
+                               close(listenerFired)
+                       },
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       defer testConsumer.Close()
+
+       _ = c.Terminate(context.Background())
+
+       select {
+       case <-listenerFired:
+       case <-time.After(30 * time.Second):
+               t.Fatal("MaxReconnectToBrokerListener was not called within 
timeout")
+       }
+
+       assert.NotNil(t, listenerErr, "listener should receive the last 
connection error")
+       assert.Equal(t, testConsumer, listenerConsumer, "listener should 
receive the parent consumer")
+}
+
+func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)

Review Comment:
   As with the previous test, consider ensuring the container is always 
terminated via `t.Cleanup`/`defer` right after creation so failures don’t leak 
a running container.
   



##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw", 
"--advertised-address", "localhost"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err)
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err)
+
+       pulsarClient, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 3 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer pulsarClient.Close()
+
+       maxRetry := uint(1)
+       listenerFired := make(chan struct{})
+       var (
+               listenerErr      error
+               listenerConsumer Consumer
+       )
+
+       topic := newTopicName()
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+                       Topic:                topic,
+                       SubscriptionName:     "test-max-reconnect-listener",
+                       MaxReconnectToBroker: &maxRetry,
+                       BackOffPolicyFunc: func() backoff.Policy {
+                               return 
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+                       },

Review Comment:
   `BackOffPolicyFunc` here uses `newTestBackoffPolicy`, whose 
`IsMaxBackoffReached()` returns false, so the new behavior for the 
`IsMaxBackoffReached` exhaustion path is untested. Consider adding a 
unit/integration test with a backoff policy that flips `IsMaxBackoffReached()` 
to true to assert the listener fires once and the consumer either stops 
retrying or closes (depending on configuration).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to