This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2ad0e5a [Java Client] Use failPendingMessages to ensure proper
cleanup (#12259)
2ad0e5a is described below
commit 2ad0e5afccaaae85969d2924920a55ce95e248f6
Author: Michael Marshall <[email protected]>
AuthorDate: Tue Oct 5 19:15:45 2021 -0500
[Java Client] Use failPendingMessages to ensure proper cleanup (#12259)
* [Java Client] Use failPendingMessages to ensure proper cleanup
* Update method name from code review comments
* Update
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
Co-authored-by: Matteo Merli <[email protected]>
* Move setState into sync block; consolidate client.cleanupProducer call
* Move cleanupProducer into sync block
* Make method closeAndClearPendingMessages synchronized
Co-authored-by: Matteo Merli <[email protected]>
---
.../pulsar/client/impl/ProducerCloseTest.java | 26 +++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 29 ++++++----------------
2 files changed, 34 insertions(+), 21 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index 0c4df15..706849f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -32,6 +32,7 @@ import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Test(groups = "broker-impl")
@@ -73,6 +74,31 @@ public class ProducerCloseTest extends ProducerConsumerBase {
Assert.assertEquals(completableFuture.isDone(), true);
}
+ @Test(timeOut = 10_000)
+ public void
testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws
Exception {
+ initClient();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerClose")
+ .maxPendingMessages(10)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .enableBatching(true)
+ .create();
+ CompletableFuture<MessageId> completableFuture = producer.newMessage()
+ .value("test-msg".getBytes(StandardCharsets.UTF_8))
+ .sendAsync();
+ // By setting the state to Failed, the close method will exit early
because the previous state was not Ready.
+ producer.setState(HandlerState.State.Failed);
+ producer.closeAsync();
+ Assert.assertTrue(completableFuture.isCompletedExceptionally());
+ try {
+ completableFuture.get();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof
PulsarClientException.AlreadyClosedException);
+ }
+ }
+
private void initClient() throws PulsarClientException {
pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index d0d3db1..5177451 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -875,12 +875,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
log.info("[{}] [{}] Closed Producer (not connected)", topic,
producerName);
- synchronized (this) {
- setState(State.Closed);
- client.cleanupProducer(this);
- clearPendingMessagesWhenClose();
- }
-
+ closeAndClearPendingMessages();
return CompletableFuture.completedFuture(null);
}
@@ -893,14 +888,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
if (exception == null || !cnx.ctx().channel().isActive()) {
// Either we've received the success response for the close
producer command from the broker, or the
// connection did break in the meantime. In any case, the
producer is gone.
- synchronized (ProducerImpl.this) {
- log.info("[{}] [{}] Closed Producer", topic, producerName);
- setState(State.Closed);
- clearPendingMessagesWhenClose();
- }
-
+ log.info("[{}] [{}] Closed Producer", topic, producerName);
+ closeAndClearPendingMessages();
closeFuture.complete(null);
- client.cleanupProducer(this);
} else {
closeFuture.completeExceptionally(exception);
}
@@ -911,17 +901,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return closeFuture;
}
- private void clearPendingMessagesWhenClose() {
+ private synchronized void closeAndClearPendingMessages() {
+ setState(State.Closed);
+ client.cleanupProducer(this);
PulsarClientException ex = new
PulsarClientException.AlreadyClosedException(
format("The producer %s of the topic %s was already closed
when closing the producers",
producerName, topic));
- pendingMessages.forEach(msg -> {
-
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
- msg.sendComplete(ex);
- msg.cmd.release();
- msg.recycle();
- });
- pendingMessages.clear();
+ // Use null for cnx to ensure that the pending messages are failed
immediately
+ failPendingMessages(null, ex);
}
@Override