michaeljmarshall commented on a change in pull request #12259:
URL: https://github.com/apache/pulsar/pull/12259#discussion_r720549153
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -912,16 +906,14 @@ protected WriteInEventLoopCallback
newObject(Handle<WriteInEventLoopCallback> ha
}
private void clearPendingMessagesWhenClose() {
- PulsarClientException ex = new
PulsarClientException.AlreadyClosedException(
- format("The producer %s of the topic %s was already closed
when closing the producers",
- producerName, topic));
- pendingMessages.forEach(msg -> {
-
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
- msg.sendComplete(ex);
- msg.cmd.release();
- msg.recycle();
- });
- pendingMessages.clear();
+ setState(State.Closed);
+ synchronized (this) {
Review comment:
> it would appear atomic also with all the other changes in the sync
block.
Yes, that is true. I'll move it back into the sync block. I wonder if these
should be updated too?
https://github.com/apache/pulsar/blob/da5bac990042e86d339fc4cba431b555ec293d6d/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1504-L1517
I don't think it will matter too much because once the state is set to a
failure/closed state, the producer won't accept new messages anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]