Copilot commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3377613245
##########
pulsar/consumer_partition.go:
##########
@@ -331,6 +338,24 @@ func (p *availablePermits) flowIfNeed() {
}
}
+func (p *availablePermits) flush() {
+ if p.pc.paused.Load() {
+ return
+ }
+
+ current := p.get()
+ if current > 0 {
+ if !p.permits.CompareAndSwap(current, 0) {
+ return
+ }
+
+ p.pc.log.Debugf("flushing withheld permits=%d", current)
+ if err := p.pc.internalFlow(uint32(current)); err != nil {
+ p.pc.log.WithError(err).Error("unable to send permits")
+ }
+ }
+}
Review Comment:
`availablePermits.flush()` does a single `CompareAndSwap` attempt and
returns on failure. Under concurrent `add()` activity this can exit without
sending any permits even though `permits` remains > 0, which defeats the
purpose of `resume()` (and can leave the consumer stuck if no further `add()`
calls occur to reach the flow threshold). Make `flush()` retry the CAS until it
either observes 0 permits or successfully swaps the current value to 0 and
sends it.
##########
pulsar/consumer_test.go:
##########
@@ -6441,3 +6441,224 @@ func TestIsNonRetriableSubscribeError(t *testing.T) {
})
}
}
+
+func drainUntilTimeout(t *testing.T, consumer Consumer, perMsgTimeout
time.Duration) int {
+ t.Helper()
+ count := 0
+ for {
+ ctx, cancel := context.WithTimeout(context.Background(),
perMsgTimeout)
+ msg, err := consumer.Receive(ctx)
+ cancel()
+ if err != nil {
+ return count
+ }
Review Comment:
`drainUntilTimeout` stops draining on *any* `Receive` error and treats it as
the timeout condition. That can hide unexpected failures (e.g., consumer
closed, connection errors) and make the pause/resume assertions misleading.
Assert that the error is actually the expected context deadline before
returning.
##########
pulsar/consumer_regex.go:
##########
@@ -348,6 +351,28 @@ func (c *regexConsumer) Name() string {
return c.consumerName
}
+func (c *regexConsumer) Pause() {
+ c.consumersLock.Lock()
+ defer c.consumersLock.Unlock()
+ c.paused.Store(true)
+ for _, con := range c.consumers {
+ con.Pause()
+ }
+}
+
+func (c *regexConsumer) Resume() {
+ c.consumersLock.Lock()
+ defer c.consumersLock.Unlock()
+ c.paused.Store(false)
+ for _, con := range c.consumers {
+ con.Resume()
+ }
+}
Review Comment:
`regexConsumer.Pause/Resume` hold `consumersLock` while calling
`con.Pause()` / `con.Resume()` on child consumers. `Resume()` can trigger
`partitionConsumer.resume()` → `internalFlow()` (network I/O), so this can
block topic discovery/unsubscribe work and increases deadlock risk if any child
implementation calls back into `regexConsumer`. Snapshot the current consumers
under the lock, then release the lock before invoking child methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]