PavelZeger opened a new pull request, #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490
Fixes #1481
### Motivation
When a `partitionConsumer` exhausts all broker reconnection attempts
(controlled by
`MaxReconnectToBroker`), the client silently increments a metric and exits
the retry loop,
leaving the consumer alive but unable to receive messages. There is no way
for application
code to detect this failure or react to it (e.g. recreate the consumer or
alert on-call).
### Modifications
Added two opt-in fields to `ConsumerOptions`:
- **`MaxReconnectToBrokerListener func(consumer Consumer, err error)`** — a
callback invoked
exactly once, on the same internal goroutine, immediately after the last
reconnect attempt
fails. The `consumer` argument is the parent `Consumer` the application
holds, and `err`
is the last connection error. The listener fires whenever
`MaxReconnectToBroker` retries
are exhausted **or** when the configured backoff policy signals
`IsMaxBackoffReached`.
- **`CloseConsumerOnMaxReconnectToBroker bool`** — when `true`,
automatically closes the
consumer after exhausting reconnect attempts. The close runs
asynchronously after
`MaxReconnectToBrokerListener` (if set) returns. Internally
`parentConsumer.Close()` is
launched in a goroutine; this cancels the consumer's context, which
unblocks the
`internal.Retry` loop, allowing `runEventsLoop` to process the close
request without
deadlocking.
Both fields default to their zero values (`nil` / `false`), so there is no
behaviour change
for existing consumers.
### Why points 3 and 4 from the issue are not implemented in this PR
The original issue suggested two additional fixes:
**3. Propagate the error to the consumer's error channel**
The `Consumer` interface does not expose an error channel. Adding one would
be a breaking
API change: every implementation (`consumer`, `consumer_multitopic`,
`consumer_regex`,
`consumer_zero_queue`) would need a new method, and all existing callers
that perform a
type-assertion or embed the interface would break. This is a larger design
decision that
warrants its own issue and a deprecation / migration path. The
`MaxReconnectToBrokerListener`
callback achieves the same observable outcome (application code is notified
of the failure)
without modifying the public interface.
**4. Update consumer state to a terminal "failed" state**
There is currently no `consumerFailed` state in the internal state machine
(`consumerInit → consumerReady → consumerClosing → consumerClosed`).
Introducing a new
terminal state would require updating every state guard in
`consumer_partition.go`
(there are more than a dozen) as well as the multi-topic and regex consumer
wrappers.
In practice, enabling `CloseConsumerOnMaxReconnectToBroker` already
transitions the
consumer through `consumerClosing → consumerClosed`, which is the correct
terminal state
and prevents any further operations on a dead consumer. A separate "failed"
state that
carries an error cause can be considered as a follow-up if observability
tooling needs to
distinguish a failed-closed consumer from a normally-closed one.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added behaviour that requires a running broker to test
end-to-end. Unit-level
verification can be done by constructing a `partitionConsumerOpts` directly
with a
`maxReconnectToBroker` of 1 and asserting the listener fires and the
consumer closes.
Integration test coverage is tracked as a follow-up.
### Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API: yes — two new opt-in fields added to `ConsumerOptions`
- The schema: no
- The default values of configurations: no
- The wire protocol: no
### Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? GoDocs on the new `ConsumerOptions`
fields
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]