This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4fba95abb701de186235ff6115955acb6e389a64 Author: Michael Marshall <[email protected]> AuthorDate: Sat Jul 30 10:31:30 2022 +0800 [Java Client] Send CloseConsumer on timeout (#16616) (cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918) --- .../apache/pulsar/client/api/ClientErrorsTest.java | 47 ++++++++++++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 69 +++++++++++++--------- site2/docs/developing-binary-protocol.md | 9 +++ 3 files changed, 95 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index d7507d2b47c..d98f0d57da0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -240,7 +240,7 @@ public class ClientErrorsTest { }); // Create producer should succeed then upon closure, it should reattempt creation. The first request will - // timeout, which triggers CloseProducer. The client might send send the third Producer command before the + // time out, which triggers CloseProducer. The client might send the third Producer command before the // below assertion, so we pass with 2 or 3. client.newProducer().topic(topic).create(); Awaitility.await().until(() -> closeProducerCounter.get() == 1); @@ -249,6 +249,51 @@ public class ClientErrorsTest { mockBrokerService.resetHandleCloseProducer(); } + @Test + public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception { + consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1"); + } + + @Test + public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception { + consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1"); + } + + private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress()) + .operationTimeout(1, TimeUnit.SECONDS).build(); + final AtomicInteger subscribeCounter = new AtomicInteger(0); + final AtomicInteger closeConsumerCounter = new AtomicInteger(0); + + mockBrokerService.setHandleSubscribe((ctx, subscribe) -> { + int subscribeCount = subscribeCounter.incrementAndGet(); + if (subscribeCount == 1) { + ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); + // Trigger reconnect + ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1)); + } else if (subscribeCount != 2) { + // Respond to subsequent requests to prevent timeouts + ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId())); + } + // Don't respond to the second Subscribe command to ensure timeout + }); + + mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> { + closeConsumerCounter.incrementAndGet(); + ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId())); + }); + + // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first + // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before + // the below assertion, so we pass with 2 or 3. + client.newConsumer().topic(topic).subscriptionName("test").subscribe(); + Awaitility.await().until(() -> closeConsumerCounter.get() == 1); + Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3); + mockBrokerService.resetHandleSubscribe(); + mockBrokerService.resetHandleCloseConsumer(); + } + @Test public void testProducerFailDoesNotFailOtherProducer() throws Exception { producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ef06a2e71aa..0079492be07 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -792,35 +792,46 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress()); - if (e.getCause() instanceof PulsarClientException - && PulsarClientException.isRetriableError(e.getCause()) - && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { - reconnectLater(e.getCause()); - } else if (!subscribeFuture.isDone()) { - // unable to create new consumer, fail operation - setState(State.Failed); - closeConsumerTasks(); - subscribeFuture.completeExceptionally( - PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s with subscription " + - "name %s when connecting to the broker", topicName.toString(), subscription))); - client.cleanupConsumer(this); - } else if (e.getCause() instanceof TopicDoesNotExistException) { - // The topic was deleted after the consumer was created, and we're - // not allowed to recreate the topic. This can happen in few cases: - // * Regex consumer getting error after topic gets deleted - // * Regular consumer after topic is manually delete and with - // auto-topic-creation set to false - // No more retries are needed in this case. - setState(State.Failed); - closeConsumerTasks(); - client.cleanupConsumer(this); - log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress()); - } else { - // consumer was subscribed and connected but we got some error, keep trying - reconnectLater(e.getCause()); - } - return null; - }); + if (e.getCause() instanceof PulsarClientException.TimeoutException) { + // Creating the consumer has timed out. We need to ensure the broker closes the consumer + // in case it was indeed created, otherwise it might prevent new create consumer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + + if (e.getCause() instanceof PulsarClientException + && PulsarClientException.isRetriableError(e.getCause()) + && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) { + reconnectLater(e.getCause()); + } else if (!subscribeFuture.isDone()) { + // unable to create new consumer, fail operation + setState(State.Failed); + closeConsumerTasks(); + subscribeFuture.completeExceptionally( + PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s " + + "with subscription name %s when connecting to the broker", + topicName.toString(), subscription))); + client.cleanupConsumer(this); + } else if (e.getCause() instanceof TopicDoesNotExistException) { + // The topic was deleted after the consumer was created, and we're + // not allowed to recreate the topic. This can happen in few cases: + // * Regex consumer getting error after topic gets deleted + // * Regular consumer after topic is manually delete and with + // auto-topic-creation set to false + // No more retries are needed in this case. + setState(State.Failed); + closeConsumerTasks(); + client.cleanupConsumer(this); + log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", + topic, subscription, cnx.channel().remoteAddress()); + } else { + // consumer was subscribed and connected but we got some error, keep trying + reconnectLater(e.getCause()); + } + return null; + }); } protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) { diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md index 33861af0da7..85562432263 100644 --- a/site2/docs/developing-binary-protocol.md +++ b/site2/docs/developing-binary-protocol.md @@ -279,6 +279,10 @@ subscription is not already there, a new one will be created.  +If the client does not receive a response indicating consumer creation success or failure, +the client should first send a command to close the original consumer before sending a +command to re-attempt consumer creation. + #### Flow control After the consumer is ready, the client needs to *give permission* to the @@ -388,6 +392,11 @@ Parameters: This command behaves the same as [`CloseProducer`](#command-closeproducer) +If the client does not receive a response to a `Subscribe` command within a timeout, +the client must first send a `CloseConsumer` command before sending another +`Subscribe` command. The client does not need to await a response to the `CloseConsumer` +command before sending the next `Subscribe` command. + ##### Command RedeliverUnacknowledgedMessages A consumer can ask the broker to redeliver some or all of the pending messages
