This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c2bb5530b48fd15b0cf7b6bc7d2fc6916006d6cf Author: Michael Marshall <[email protected]> AuthorDate: Fri Jul 29 21:31:30 2022 -0500 [Java Client] Send CloseConsumer on timeout (#16616) (cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918) --- .../apache/pulsar/client/api/ClientErrorsTest.java | 47 +++++++++++++++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++ site2/docs/developing-binary-protocol.md | 9 +++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index 4353297666a..ec4d56e6fbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -240,7 +240,7 @@ public class ClientErrorsTest { }); // Create producer should succeed then upon closure, it should reattempt creation. The first request will - // timeout, which triggers CloseProducer. The client might send send the third Producer command before the + // time out, which triggers CloseProducer. The client might send the third Producer command before the // below assertion, so we pass with 2 or 3. client.newProducer().topic(topic).create(); Awaitility.await().until(() -> closeProducerCounter.get() == 1); @@ -249,6 +249,51 @@ public class ClientErrorsTest { mockBrokerService.resetHandleCloseProducer(); } + @Test + public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception { + consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1"); + } + + @Test + public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception { + consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1"); + } + + private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(1, TimeUnit.SECONDS).build(); + final AtomicInteger subscribeCounter = new AtomicInteger(0); + final AtomicInteger closeConsumerCounter = new AtomicInteger(0); + + mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { + int subscribeCount = subscribeCounter.incrementAndGet(); + if (subscribeCount == 1) { + ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); + // Trigger reconnect + ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1)); + } else if (subscribeCount != 2) { + // Respond to subsequent requests to prevent timeouts + ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); + } + // Don't respond to the second Subscribe command to ensure timeout + }); + + mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> { + closeConsumerCounter.incrementAndGet(); + ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId())); + }); + + // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first + // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before + // the below assertion, so we pass with 2 or 3. + client.newConsumer().topic(topic).subscriptionName("test").subscribe(); + Awaitility.await().until(() -> closeConsumerCounter.get() == 1); + Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3); + mockBrokerService.resetHandleSubscribe(); + mockBrokerService.resetHandleCloseConsumer(); + } + @Test public void testProducerFailDoesNotFailOtherProducer() throws Exception { producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2bef30012b4..69282f6bea6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -867,6 +867,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress()); + if (e.getCause() instanceof PulsarClientException.TimeoutException) { + // Creating the consumer has timed out. We need to ensure the broker closes the consumer + // in case it was indeed created, otherwise it might prevent new create consumer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + if (e.getCause() instanceof PulsarClientException && PulsarClientException.isRetriableError(e.getCause()) && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md index 3caf769ad2c..fdab6c73465 100644 --- a/site2/docs/developing-binary-protocol.md +++ b/site2/docs/developing-binary-protocol.md @@ -310,6 +310,10 @@ subscription is not already there, a new one will be created.  +If the client does not receive a response indicating consumer creation success or failure, +the client should first send a command to close the original consumer before sending a +command to re-attempt consumer creation. + #### Flow control After the consumer is ready, the client needs to *give permission* to the @@ -419,6 +423,11 @@ Parameters: This command behaves the same as [`CloseProducer`](#command-closeproducer) +If the client does not receive a response to a `Subscribe` command within a timeout, +the client must first send a `CloseConsumer` command before sending another +`Subscribe` command. The client does not need to await a response to the `CloseConsumer` +command before sending the next `Subscribe` command. + ##### Command RedeliverUnacknowledgedMessages A consumer can ask the broker to redeliver some or all of the pending messages
