sijie opened a new issue #4642: producer#flush doesn't work as expected
URL: https://github.com/apache/pulsar/issues/4642
 
 
   **Describe the bug**
   
   The following code example should produce two message batches. but only one 
message batch is produced.
   
   ```
   producer.sendAsync("message1");
   producer.sendAsync("message2");
   producer.flush();
   
   producer.sendAsync("message3");
   producer.sendAsync("message4");
   producer.flush();
   ```
   
   **To Reproduce**
   
   Use the example code to reproduce
   
   **Expected behavior**
   
   When `flush` is triggered, it should flush out all the messages.
   
   **Additional context**
   
   The problem is due to following logic. if the connection is not connected, 
it doesn't send the batch. 
   
   ```
   private void processOpSendMsg(OpSendMsg op) {
           try {
               batchMessageContainer.clear();
               pendingMessages.put(op);
               ClientCnx cnx = cnx();
               if (isConnected()) {
                   // If we do have a connection, the message is sent 
immediately, otherwise we'll try again once a new
                   // connection is established
                   op.cmd.retain();
                   
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, 
cnx, op));
                   stats.updateNumMsgsSent(op.numMessagesInBatch, 
op.batchSizeByte);
               } else {
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] [{}] Connection is not ready -- 
sequenceId {}", topic, producerName,
                           op.sequenceId);
                   }
               }
           } catch (InterruptedException ie) {
               Thread.currentThread().interrupt();
               semaphore.release(op.numMessagesInBatch);
               if (op != null) {
                   op.callback.sendComplete(new PulsarClientException(ie));
               }
           } catch (Throwable t) {
               semaphore.release(op.numMessagesInBatch);
               log.warn("[{}] [{}] error while closing out batch -- {}", topic, 
producerName, t);
               if (op != null) {
                   op.callback.sendComplete(new PulsarClientException(t));
               }
           }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to