PavelZeger opened a new pull request, #1490:
URL: https://github.com/apache/pulsar-client-go/pull/1490

   Fixes #1481
   
   ### Motivation
   
   When a `partitionConsumer` exhausts all broker reconnection attempts 
(controlled by
   `MaxReconnectToBroker`), the client silently increments a metric and exits 
the retry loop,
   leaving the consumer alive but unable to receive messages. There is no way 
for application
   code to detect this failure or react to it (e.g. recreate the consumer or 
alert on-call).
   
   ### Modifications
   
   Added two opt-in fields to `ConsumerOptions`:
   
   - **`MaxReconnectToBrokerListener func(consumer Consumer, err error)`** — a 
callback invoked
     exactly once, on the same internal goroutine, immediately after the last 
reconnect attempt
     fails. The `consumer` argument is the parent `Consumer` the application 
holds, and `err`
     is the last connection error. The listener fires whenever 
`MaxReconnectToBroker` retries
     are exhausted **or** when the configured backoff policy signals 
`IsMaxBackoffReached`.
   
   - **`CloseConsumerOnMaxReconnectToBroker bool`** — when `true`, 
automatically closes the
     consumer after exhausting reconnect attempts. The close runs 
asynchronously after
     `MaxReconnectToBrokerListener` (if set) returns. Internally 
`parentConsumer.Close()` is
     launched in a goroutine; this cancels the consumer's context, which 
unblocks the
     `internal.Retry` loop, allowing `runEventsLoop` to process the close 
request without
     deadlocking.
   
   Both fields default to their zero values (`nil` / `false`), so there is no 
behaviour change
   for existing consumers.
   
   ### Why points 3 and 4 from the issue are not implemented in this PR
   
   The original issue suggested two additional fixes:
   
   **3. Propagate the error to the consumer's error channel**
   
   The `Consumer` interface does not expose an error channel. Adding one would 
be a breaking
   API change: every implementation (`consumer`, `consumer_multitopic`, 
`consumer_regex`,
   `consumer_zero_queue`) would need a new method, and all existing callers 
that perform a
   type-assertion or embed the interface would break. This is a larger design 
decision that
   warrants its own issue and a deprecation / migration path. The 
`MaxReconnectToBrokerListener`
   callback achieves the same observable outcome (application code is notified 
of the failure)
   without modifying the public interface.
   
   **4. Update consumer state to a terminal "failed" state**
   
   There is currently no `consumerFailed` state in the internal state machine
   (`consumerInit → consumerReady → consumerClosing → consumerClosed`). 
Introducing a new
   terminal state would require updating every state guard in 
`consumer_partition.go`
   (there are more than a dozen) as well as the multi-topic and regex consumer 
wrappers.
   In practice, enabling `CloseConsumerOnMaxReconnectToBroker` already 
transitions the
   consumer through `consumerClosing → consumerClosed`, which is the correct 
terminal state
   and prevents any further operations on a dead consumer. A separate "failed" 
state that
   carries an error cause can be considered as a follow-up if observability 
tooling needs to
   distinguish a failed-closed consumer from a normally-closed one.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change added behaviour that requires a running broker to test 
end-to-end. Unit-level
   verification can be done by constructing a `partitionConsumerOpts` directly 
with a
   `maxReconnectToBroker` of 1 and asserting the listener fires and the 
consumer closes.
   Integration test coverage is tracked as a follow-up.
   
   ### Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): no
   - The public API: yes — two new opt-in fields added to `ConsumerOptions`
   - The schema: no
   - The default values of configurations: no
   - The wire protocol: no
   
   ### Documentation
   
   - Does this pull request introduce a new feature? yes
   - If yes, how is the feature documented? GoDocs on the new `ConsumerOptions` 
fields


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to