Copilot commented on code in PR #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490#discussion_r3209532191
##########
pulsar/consumer_partition.go:
##########
@@ -2100,6 +2102,12 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
+ if pc.options.maxReconnectToBrokerListener != nil {
+
pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err)
+ }
Review Comment:
When `bo.IsMaxBackoffReached()` becomes true, this branch will execute on
every subsequent reconnect failure (since the retry loop continues returning
`err`). That means `MaxReconnectToBrokerListener` can be invoked multiple times
and (if enabled) repeated close goroutines can be spawned. If the intent is to
treat max-backoff as an exhaustion condition, consider making it a terminal
condition (e.g., stop retrying by returning `nil`/forcing `maxRetry=0`) and/or
guard the notification/close with a once flag so it fires only once per
reconnect cycle.
##########
pulsar/consumer_partition.go:
##########
@@ -2100,6 +2102,12 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
pc.metrics.ConsumersReconnectFailure.Inc()
if maxRetry == 0 || bo.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
+ if pc.options.maxReconnectToBrokerListener != nil {
+
pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err)
+ }
+ if pc.options.closeConsumerOnMaxReconnectToBroker {
+ go pc.parentConsumer.Close()
+ }
Review Comment:
`go pc.parentConsumer.Close()` can be launched multiple times if the
exhaustion condition is hit repeatedly (notably when `IsMaxBackoffReached`
stays true). Even though `consumer.Close()` is idempotent, spawning unbounded
goroutines is avoidable; consider gating this close trigger (e.g., with
sync.Once/atomic) together with the listener firing.
##########
pulsar/consumer.go:
##########
@@ -222,6 +222,18 @@ type ConsumerOptions struct {
// MaxReconnectToBroker sets the maximum retry number of
reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
+ // MaxReconnectToBrokerListener is called when the consumer exhausts
all reconnect attempts
+ // set by MaxReconnectToBroker. The consumer argument is the parent
consumer, and err is the
+ // last connection error. Use this callback to detect silent failure
and take recovery action
+ // (e.g. recreate the consumer). Only fires when MaxReconnectToBroker
is set to a finite value
+ // or when the backoff policy signals IsMaxBackoffReached.
+ MaxReconnectToBrokerListener func(consumer Consumer, err error)
+
+ // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes
the consumer after
+ // exhausting all reconnect attempts. The close happens asynchronously
after
+ // MaxReconnectToBrokerListener (if set) returns. Default: false.
Review Comment:
The GoDoc implies applications can “take recovery action (e.g. recreate the
consumer)”. Since this listener is invoked from the partition consumer event
loop, calling `Consumer.Close()` synchronously inside the callback can deadlock
(partitionConsumer.Close sends a closeRequest on `eventsCh` and waits, but the
event loop is blocked in `reconnectToBroker`). Please document that callbacks
must not call `Close()` directly (or must do so asynchronously), and/or
recommend using `CloseConsumerOnMaxReconnectToBroker` for safe auto-close.
##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
Review Comment:
The test starts a container but only terminates it at the end of the happy
path. If any `require.*` assertion fails after the container is started, the
container may be left running and affect subsequent tests. Consider registering
`t.Cleanup`/`defer` to terminate the container immediately after it is created
(and check the terminate error).
##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
+
+ pulsarClient, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 3 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer pulsarClient.Close()
+
+ maxRetry := uint(1)
+ listenerFired := make(chan struct{})
+ var (
+ listenerErr error
+ listenerConsumer Consumer
+ )
+
+ topic := newTopicName()
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "test-max-reconnect-listener",
+ MaxReconnectToBroker: &maxRetry,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ },
+ MaxReconnectToBrokerListener: func(c Consumer, e error)
{
+ listenerConsumer = c
+ listenerErr = e
+ close(listenerFired)
+ },
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ defer testConsumer.Close()
+
+ _ = c.Terminate(context.Background())
+
+ select {
+ case <-listenerFired:
+ case <-time.After(30 * time.Second):
+ t.Fatal("MaxReconnectToBrokerListener was not called within
timeout")
+ }
+
+ assert.NotNil(t, listenerErr, "listener should receive the last
connection error")
+ assert.Equal(t, testConsumer, listenerConsumer, "listener should
receive the parent consumer")
+}
+
+func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
Review Comment:
As with the previous test, consider ensuring the container is always
terminated via `t.Cleanup`/`defer` right after creation so failures don’t leak
a running container.
##########
pulsar/consumer_test.go:
##########
@@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
"The consumer uses a different connection when
reconnecting")
}
}
+
+func TestConsumerMaxReconnectToBrokerListener(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw",
"--advertised-address", "localhost"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err)
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err)
+
+ pulsarClient, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 3 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer pulsarClient.Close()
+
+ maxRetry := uint(1)
+ listenerFired := make(chan struct{})
+ var (
+ listenerErr error
+ listenerConsumer Consumer
+ )
+
+ topic := newTopicName()
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "test-max-reconnect-listener",
+ MaxReconnectToBroker: &maxRetry,
+ BackOffPolicyFunc: func() backoff.Policy {
+ return
newTestBackoffPolicy(100*time.Millisecond, 1*time.Second)
+ },
Review Comment:
`BackOffPolicyFunc` here uses `newTestBackoffPolicy`, whose
`IsMaxBackoffReached()` returns false, so the new behavior for the
`IsMaxBackoffReached` exhaustion path is untested. Consider adding a
unit/integration test with a backoff policy that flips `IsMaxBackoffReached()`
to true to assert the listener fires once and the consumer either stops
retrying or closes (depending on configuration).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]