merlimat commented on a change in pull request #12259:
URL: https://github.com/apache/pulsar/pull/12259#discussion_r720532673
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -875,11 +875,8 @@ protected WriteInEventLoopCallback
newObject(Handle<WriteInEventLoopCallback> ha
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
log.info("[{}] [{}] Closed Producer (not connected)", topic,
producerName);
- synchronized (this) {
- setState(State.Closed);
- client.cleanupProducer(this);
- clearPendingMessagesWhenClose();
- }
+ client.cleanupProducer(this);
Review comment:
Would it make sense to also move `client.cleanupProducer(this);` to
`closeAndClearPendingMessages()`?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
##########
@@ -73,6 +74,31 @@ public void testProducerCloseCallback() throws Exception {
Assert.assertEquals(completableFuture.isDone(), true);
}
+ @Test(timeOut = 10_000)
+ public void
testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws
Exception {
+ initClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerClose")
+ .maxPendingMessages(10)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .enableBatching(true)
+ .create();
+ final TypedMessageBuilder<byte[]> messageBuilder =
producer.newMessage();
+ final TypedMessageBuilder<byte[]> value =
messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8));
+ final CompletableFuture<MessageId> completableFuture =
value.sendAsync();
Review comment:
This could be shortened to:
```suggestion
CompletableFuture<MessageId> completableFuture =
producer.newMessage()
.value("test-msg".getBytes(StandardCharsets.UTF_8))
.sendAsync();
```
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -912,16 +906,14 @@ protected WriteInEventLoopCallback
newObject(Handle<WriteInEventLoopCallback> ha
}
private void clearPendingMessagesWhenClose() {
- PulsarClientException ex = new
PulsarClientException.AlreadyClosedException(
- format("The producer %s of the topic %s was already closed
when closing the producers",
- producerName, topic));
- pendingMessages.forEach(msg -> {
-
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
- msg.sendComplete(ex);
- msg.cmd.release();
- msg.recycle();
- });
- pendingMessages.clear();
+ setState(State.Closed);
+ synchronized (this) {
Review comment:
I don't remember the original reasoning, though if the variable state is
changed within the sync block, it would appear atomic also with all the other
changes in the sync block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]