This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 168bbab4ca5d13afda5eaa173dbcf9f923f22427 Author: Michael Marshall <[email protected]> AuthorDate: Thu Dec 9 13:17:13 2021 -0600 [Java Client] Send CloseProducer on timeout (#13161) * [Java Client] Send CloseProducer on timeout * Fix failed test from ClientErrorsTest (cherry picked from commit 27d542994703ef40a09363f5de4e92cc3c9d3116) --- .../apache/pulsar/client/api/ClientErrorsTest.java | 58 +++++++++++++++++++++- .../apache/pulsar/client/impl/ProducerImpl.java | 12 ++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index f66b198..d7507d2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -170,6 +171,7 @@ public class ClientErrorsTest { PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) .operationTimeout(1, TimeUnit.SECONDS).build(); final AtomicInteger counter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); mockBrokerService.setHandleProducer((ctx, producer) -> { if (counter.incrementAndGet() == 2) { @@ -182,6 +184,10 @@ public class ClientErrorsTest { ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg")); }); + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + }); + try { client.newProducer().topic(topic).create(); fail("Should have failed"); @@ -189,8 +195,58 @@ public class ClientErrorsTest { // we fail even on the retriable error assertTrue(e instanceof PulsarClientException); } + // There is a small race condition here because the producer's timeout both fails the client creation + // and triggers sending CloseProducer. + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); + } + + @Test + public void testCreatedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1"); + } + + @Test + public void testCreatedPartitionedProducerSendsCloseProducerAfterTimeout() throws Exception { + producerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1"); + } + + private void producerCreatedThenFailsRetryTimeout(String topic) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(1, TimeUnit.SECONDS).build(); + final AtomicInteger producerCounter = new AtomicInteger(0); + final AtomicInteger closeProducerCounter = new AtomicInteger(0); + mockBrokerService.setHandleProducer((ctx, producer) -> { + int producerCount = producerCounter.incrementAndGet(); + if (producerCount == 1) { + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); + // Trigger reconnect + ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1)); + } else if (producerCount != 2) { + // Respond to subsequent requests to prevent timeouts + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "producer1", + SchemaVersion.Empty)); + } + // Don't respond to the second Producer command to ensure timeout + }); + + mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> { + closeProducerCounter.incrementAndGet(); + ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId())); + }); + + // Create producer should succeed then upon closure, it should reattempt creation. The first request will + // timeout, which triggers CloseProducer. The client might send send the third Producer command before the + // below assertion, so we pass with 2 or 3. + client.newProducer().topic(topic).create(); + Awaitility.await().until(() -> closeProducerCounter.get() == 1); + Awaitility.await().until(() -> producerCounter.get() == 2 || producerCounter.get() == 3); mockBrokerService.resetHandleProducer(); + mockBrokerService.resetHandleCloseProducer(); } @Test @@ -491,7 +547,6 @@ public class ClientErrorsTest { mockBrokerService.resetHandleProducer(); mockBrokerService.resetHandleCloseProducer(); - client.close(); } // failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode @@ -552,7 +607,6 @@ public class ClientErrorsTest { mockBrokerService.resetHandleProducer(); mockBrokerService.resetHandleCloseProducer(); - client.close(); } // if a producer which doesn't connect as lazy-loading mode fails to connect while creating partitioned producer, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e33fb5f..686fa7d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1483,8 +1483,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne cnx.channel().close(); return null; } + + if (cause instanceof TimeoutException) { + // Creating the producer has timed out. We need to ensure the broker closes the producer + // in case it was indeed created, otherwise it might prevent new create producer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); - // Close the producer since topic does not exists. + // Close the producer since topic does not exist. if (cause instanceof PulsarClientException.TopicDoesNotExistException) { closeAsync().whenComplete((v, ex) -> { if (ex != null) {
