wenbingshen opened a new issue #14598:
URL: https://github.com/apache/pulsar/issues/14598
**Describe the bug**
Our application needs to write data to a multi-partition topic. When there
is a problem with the broker and the production side cannot write, a timeout
exception will occur. When this problem occurs, there is a possibility that
this problem cannot be recovered from the timeout exception for a long time.
According to my investigation, when `PartitionedProducerImpl` writes data,
it will route the message to different partitions according to the message
router. When an exception occurs, the new data is never routed to the partition
where the exception occurred previously according to the router. All write
operations will fail because every time we send 100 messages asynchronously,
the `producer.flush()` operation will be executed, which will always throw the
exception that happened earlier.
Below is part of my application code and syncMode=true
```
for (int i = 0; i < 100; i++) {
if (syncMode) {
newMessage.send();
} else {
newMessage.sendAsync();
}
}
if (!syncMode) {
producer.flush();
}
```
`PartitionedProducerImpl` will call the `flushAsync` method of all
partitioned producers. The `ProducerImpl` that has an exception will return an
abnormal `lastSendFuture`, causing the application to fail to execute.
https://github.com/apache/pulsar/blob/master/pulsar-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fclient%2Fimpl%2FPartitionedProducerImpl.java#L279
```
@Override
public CompletableFuture<Void> flushAsync() {
return CompletableFuture.allOf(
producers.values().stream().map(ProducerImpl::flushAsync).toArray(CompletableFuture[]::new));
}
```
https://github.com/apache/pulsar/blob/master/pulsar-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fclient%2Fimpl%2FProducerImpl.java#L1978
```
@Override
public CompletableFuture<Void> flushAsync() {
CompletableFuture<MessageId> lastSendFuture;
synchronized (ProducerImpl.this) {
if (isBatchMessagingEnabled()) {
batchMessageAndSend();
}
lastSendFuture = this.lastSendFuture; // this future has always
failed
}
return lastSendFuture.thenApply(ignored -> null);
}
```
mat

**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
**Additional context**
Add any other context about the problem here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]