This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b72d452a91b70b6949762700ee18c26bf3e832c2 Author: Michael Marshall <[email protected]> AuthorDate: Thu Dec 9 13:17:13 2021 -0600 [Java Client] Send CloseProducer on timeout (#13161) * [Java Client] Send CloseProducer on timeout * Fix failed test from ClientErrorsTest (cherry picked from commit 27d542994703ef40a09363f5de4e92cc3c9d3116) --- .../apache/pulsar/client/api/ClientErrorsTest.java | 56 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 12 ++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index a38b711..9a8cbfc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -169,6 +170,7 @@ public class ClientErrorsTest { PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) .operationTimeout(1, TimeUnit.SECONDS).build(); final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { @@ -181,6 +183,10 @@ public class ClientErrorsTest { ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg")); }); + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + }); + try { client.newProducer().topic(topic).create(); fail("Should have failed"); @@ -188,8 +194,58 @@ public class ClientErrorsTest { // we fail even on the retriable error assertTrue(e instanceof PulsarClientException); } + // There is a small race condition here because the producer's timeout both fails the client creation + // and triggers sending CloseProducer. + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); + } + + @Test + public void testCreatedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1"); + } + + @Test + public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1"); + } + + private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(1, TimeUnit.SECONDS).build(); + final AtomicInteger producerCounter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); + mockBrokerService.setHandleProducer((ctx, producer) -> { + int producerCount = producerCounter.incrementAndGet(); + if (producerCount == 1) { + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); + // Trigger reconnect + ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1)); + } else if (producerCount != 2) { + // Respond to subsequent requests to prevent timeouts + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); + } + // Don't respond to the second Producer command to ensure timeout + }); + + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId())); + }); + + // Create producer should succeed then upon closure, it should reattempt creation. The first request will + // timeout, which triggers CloseProducer. The client might send send the third Producer command before the + // below assertion, so we pass with 2 or 3. + client.newProducer().topic(topic).create(); + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + Awaitility.await().until(() -> producerCounter.get() == 2 || producerCounter.get() == 3); mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index fc56ea3..bbf75ee 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1469,6 +1469,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne cnx.channel().close(); return null; } + + if (cause instanceof TimeoutException) { + // Creating the producer has timed out. We need to ensure the broker closes the producer + // in case it was indeed created, otherwise it might prevent new create producer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + if (cause instanceof PulsarClientException.ProducerFencedException) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Failed to create producer: {}", @@ -1477,7 +1487,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); } - // Close the producer since topic does not exists. + // Close the producer since topic does not exist. if (cause instanceof PulsarClientException.TopicDoesNotExistException) { closeAsync().whenComplete((v, ex) -> { if (ex != null) {
