PavelZeger opened a new issue, #25978:
URL: https://github.com/apache/pulsar/issues/25978

   ### Search before reporting
   
   - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Motivation
   
   I ran into this while adding pause/resume to the Go client 
(apache/pulsar-client-go#1507). The Go implementation mirrors the Java one, and 
during review I found the same gap exists in the Java client too. I already 
fixed it in the Go PR (https://github.com/apache/pulsar-client-go/pull/1507); 
this issue is to bring the same fix back to Java.
   When you pause a consumer and later resume it, `resume()` is supposed to 
tell the broker "ok, start sending me messages again." It does this by calling 
`increaseAvailablePermits(cnx(), 0)`:
   ```java
   // ConsumerImpl.resume()
   public void resume() {
       if (paused) {
           paused = false;
           increaseAvailablePermits(cnx(), 0);
       }
   }
   ```
   
   The problem is that `increaseAvailablePermits` only actually sends permits 
once the owed count reaches half the receiver queue size:
   ```java
   while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
       ...
       sendFlowPermitsToBroker(currentCnx, available);
       ...
   }
   ```
   
   That "half the queue" check is fine for normal running — you don't want to 
send a flow command for every single message. But it's the wrong thing to do on 
resume. If, right when you resume, the consumer owes fewer permits than that 
and the broker has no permits left, then resume sends nothing. And since no 
messages are coming in, the owed count never goes up, so a flow command is 
never sent. The consumer just sits there quietly, even though you resumed it. 
This isn't slow - it's stuck. There's no way for it to recover on its own. When 
does this happen? Mostly when not every delivered message bumps the permit 
count by one (for example, duplicate or chunk fragments that get thrown away). 
In that case the owed count can end up below the threshold while the broker has 
already sent everything it was allowed to. Small `receiverQueueSize` values 
make it easier to hit. In the normal case the count lands back on a full queue 
size, so it usually works - which is why nobody noticed and the
  current tests don't catch it.
   
   ### Solution
   
   Give resume its own way to send permits that ignores the "half the queue" 
check and just sends whatever is owed. Since it sends exactly what's owed and 
nothing more, it can never send too much. This is the same fix I used in the Go 
PR.
   ```java
   @Override
   public void resume() {
       if (paused) {
           paused = false;
           flushAvailablePermitsToBroker(cnx());
       }
   }
   
   private void flushAvailablePermitsToBroker(ClientCnx currentCnx) {
       int available = AVAILABLE_PERMITS_UPDATER.get(this);
       while (available > 0 && !paused) {
           if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
               sendFlowPermitsToBroker(currentCnx, available);
               break;
           } else {
               available = AVAILABLE_PERMITS_UPDATER.get(this);
           }
       }
   }
   ```
   It uses the same compare-and-swap trick the existing code already uses, so 
it won't race or send twice. If the consumer happens to be disconnected, 
`sendFlowPermitsToBroker` does nothing and the reconnect path grants a fresh 
batch anyway, so that case is safe. And `MultiTopicsConsumerImpl.resume()` just 
calls resume on each child, so this one change covers partitioned and 
multi-topic consumers too.
   
   For a test: the current `testPauseAndResume` drains the whole queue, so the 
owed count ends up back at a full queue size and the bug never shows. A real 
regression test needs to leave fewer permits owed than half the queue while the 
broker is at zero, then resume and check that a flow command actually goes out. 
A small unit test in `ConsumerImplTest` with a mocked connection is the easiest 
way to do that reliably.
   
   ### Alternatives
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to