PavelZeger commented on code in PR #1507:
URL: https://github.com/apache/pulsar-client-go/pull/1507#discussion_r3376077338


##########
pulsar/consumer_partition.go:
##########
@@ -1771,6 +1778,16 @@ func (pc *partitionConsumer) 
SetRedirectedClusterURI(redirectedClusterURI string
        pc.redirectedClusterURI = redirectedClusterURI
 }
 
+func (pc *partitionConsumer) pause() {
+       pc.paused.Store(true)
+}
+
+func (pc *partitionConsumer) resume() {
+       if pc.paused.CompareAndSwap(true, false) {
+               pc.availablePermits.flowIfNeed()
+       }

Review Comment:
   Agreed. `resume()` was going through `flowIfNeed()`, which only sends 
permits once the owed count reaches half the queue size. That's fine for normal 
running (we don't want a flow message for every single message), but it's wrong 
for resume. If the broker has zero permits and we owe it fewer than that 
threshold, resume sends nothing - and since no messages come in, the count 
never climbs back up. The consumer just sits there stuck.
   
   Java has the same gap. Its `resume()` calls `increaseAvailablePermits(cnx, 
0)`, which checks the same threshold. So this fix actually makes the Go client 
a little more correct than Java here. I've opened an issue to fix Java client 
in addition to this PR: https://github.com/apache/pulsar/issues/25978.
   
   I added a small `flush()` method that resume uses instead. It just sends 
whatever we owe, no threshold check. Since it sends exactly the owed amount, it 
can never send too much (which was the original PIP concern). It uses the same 
compare-and-swap as the normal path so it won't race or double-send.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to