wenbingshen opened a new issue #14598:
URL: https://github.com/apache/pulsar/issues/14598


   **Describe the bug**
   Our application needs to write data to a multi-partition topic. When there 
is a problem with the broker and the production side cannot write, a timeout 
exception will occur. When this problem occurs, there is a possibility that 
this problem cannot be recovered from the timeout exception for a long time.
   
   According to my investigation, when `PartitionedProducerImpl` writes data, 
it will route the message to different partitions according to the message 
router. When an exception occurs, the new data is never routed to the partition 
where the exception occurred previously according to the router. All write 
operations will fail because every time we send 100 messages asynchronously, 
the  `producer.flush()` operation will be executed, which will always throw the 
exception that happened earlier.
   
   Below is part of my application code and syncMode=true
   ```
           for (int i = 0; i < 100; i++) {
               if (syncMode) {
                   newMessage.send();
               } else {
                   newMessage.sendAsync();
               }
           }
   
           if (!syncMode) {
               producer.flush();
           }
   ```
   `PartitionedProducerImpl` will call the `flushAsync` method of all 
partitioned producers. The `ProducerImpl` that has an exception will return an 
abnormal `lastSendFuture`, causing the application to fail to execute.
   
https://github.com/apache/pulsar/blob/master/pulsar-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fclient%2Fimpl%2FPartitionedProducerImpl.java#L279
   ```
   @Override
       public CompletableFuture<Void> flushAsync() {
           return CompletableFuture.allOf(
                   
producers.values().stream().map(ProducerImpl::flushAsync).toArray(CompletableFuture[]::new));
       }
   ```
   
   
https://github.com/apache/pulsar/blob/master/pulsar-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fclient%2Fimpl%2FProducerImpl.java#L1978
   ```
       @Override
       public CompletableFuture<Void> flushAsync() {
           CompletableFuture<MessageId> lastSendFuture;
           synchronized (ProducerImpl.this) {
               if (isBatchMessagingEnabled()) {
                   batchMessageAndSend();
               }
               lastSendFuture = this.lastSendFuture; // this future has always 
failed
           }
           return lastSendFuture.thenApply(ignored -> null);
       }
   ```
   
   mat
   
![image](https://user-images.githubusercontent.com/35599757/157199226-8195d168-22d5-4b50-9f35-18e2a655e4d1.png)
   
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Go to '...'
   2. Click on '....'
   3. Scroll down to '....'
   4. See error
   
   **Expected behavior**
   A clear and concise description of what you expected to happen.
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to