gaoran10 opened a new issue #14864:
URL: https://github.com/apache/pulsar/issues/14864


   **Describe the bug**
   
   If the producer enables batch,  the message will add to the batch container 
first, when the broker state is not `ready`, there is no subsequent logic to 
add messages in the batch container to the pending messages queue.
   
   Before PR #14185 there is a timer task to call the method 
`batchMessageContainer` to move messages from batch container to pending 
messages queue. The PR #14185 optimize batch flush logic, remove the timer 
task, add a scheduled task after adding messages.
   
   Some Codes
   
   method `serializeAndSendMessage`
   ```
   boolean isBatchFull = batchMessageContainer.add(msg, callback);
   lastSendFuture = callback.getFuture();
   payload.release();
   if (isBatchFull) {
       batchMessageAndSend(false);
   } else {
       maybeScheduleBatchFlushTask();
   }
   ```
   
   If the producer state is not ready, the batch flush task will not be 
scheduled.
   ```
   private void maybeScheduleBatchFlushTask() {
       if (this.batchFlushTask != null || getState() != State.Ready) {
           log.info("xxxx [{}] MaybeScheduleBatchFlushTask state: {}", topic, 
getState());
           return;
       }
       scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
   }
   ```
   
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a producer
   2. Mock broker exception to close connection.
   3. Send normal messages in an async way and call get method or send messages 
with the transaction.
   6. The process will be stuck.
   
   **Expected behavior**
   Messages could be published normally.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to