This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4fba95abb701de186235ff6115955acb6e389a64
Author: Michael Marshall <[email protected]>
AuthorDate: Sat Jul 30 10:31:30 2022 +0800

    [Java Client] Send CloseConsumer on timeout (#16616)
    
    (cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918)
---
 .../apache/pulsar/client/api/ClientErrorsTest.java | 47 ++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 69 +++++++++++++---------
 site2/docs/developing-binary-protocol.md           |  9 +++
 3 files changed, 95 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index d7507d2b47c..d98f0d57da0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -240,7 +240,7 @@ public class ClientErrorsTest {
         });
 
         // Create producer should succeed then upon closure, it should 
reattempt creation. The first request will
-        // timeout, which triggers CloseProducer. The client might send send 
the third Producer command before the
+        // time out, which triggers CloseProducer. The client might send the 
third Producer command before the
         // below assertion, so we pass with 2 or 3.
         client.newProducer().topic(topic).create();
         Awaitility.await().until(() -> closeProducerCounter.get() == 1);
@@ -249,6 +249,51 @@ public class ClientErrorsTest {
         mockBrokerService.resetHandleCloseProducer();
     }
 
+    @Test
+    public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws 
Exception {
+        consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+    }
+
+    @Test
+    public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() 
throws Exception {
+        
consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+    }
+
+    private void consumerCreatedThenFailsRetryTimeout(String topic) throws 
Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+                .operationTimeout(1, TimeUnit.SECONDS).build();
+        final AtomicInteger subscribeCounter = new AtomicInteger(0);
+        final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
+            int subscribeCount = subscribeCounter.incrementAndGet();
+            if (subscribeCount == 1) {
+                
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+                // Trigger reconnect
+                
ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
+            } else if (subscribeCount != 2) {
+                // Respond to subsequent requests to prevent timeouts
+                
ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+            }
+            // Don't respond to the second Subscribe command to ensure timeout
+        });
+
+        mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
+            closeConsumerCounter.incrementAndGet();
+            
ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
+        });
+
+        // Create consumer (subscribe) should succeed then upon closure, it 
should reattempt creation. The first
+        // request will time out, which triggers CloseConsumer. The client 
might send the third Subscribe command before
+        // the below assertion, so we pass with 2 or 3.
+        client.newConsumer().topic(topic).subscriptionName("test").subscribe();
+        Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
+        Awaitility.await().until(() -> subscribeCounter.get() == 2 || 
subscribeCounter.get() == 3);
+        mockBrokerService.resetHandleSubscribe();
+        mockBrokerService.resetHandleCloseConsumer();
+    }
+
     @Test
     public void testProducerFailDoesNotFailOtherProducer() throws Exception {
         producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", 
"persistent://prop/use/ns/t2");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ef06a2e71aa..0079492be07 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -792,35 +792,46 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, 
subscription, cnx.channel().remoteAddress());
 
-            if (e.getCause() instanceof PulsarClientException
-                    && PulsarClientException.isRetriableError(e.getCause())
-                    && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
-                reconnectLater(e.getCause());
-            } else if (!subscribeFuture.isDone()) {
-                // unable to create new consumer, fail operation
-                setState(State.Failed);
-                closeConsumerTasks();
-                subscribeFuture.completeExceptionally(
-                    PulsarClientException.wrap(e, String.format("Failed to 
subscribe the topic %s with subscription " +
-                        "name %s when connecting to the broker", 
topicName.toString(), subscription)));
-                client.cleanupConsumer(this);
-            } else if (e.getCause() instanceof TopicDoesNotExistException) {
-                // The topic was deleted after the consumer was created, and 
we're
-                // not allowed to recreate the topic. This can happen in few 
cases:
-                //  * Regex consumer getting error after topic gets deleted
-                //  * Regular consumer after topic is manually delete and with
-                //    auto-topic-creation set to false
-                // No more retries are needed in this case.
-                setState(State.Failed);
-                closeConsumerTasks();
-                client.cleanupConsumer(this);
-                log.warn("[{}][{}] Closed consumer because topic does not 
exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
-            } else {
-                // consumer was subscribed and connected but we got some 
error, keep trying
-                reconnectLater(e.getCause());
-            }
-            return null;
-        });
+                if (e.getCause() instanceof 
PulsarClientException.TimeoutException) {
+                    // Creating the consumer has timed out. We need to ensure 
the broker closes the consumer
+                    // in case it was indeed created, otherwise it might 
prevent new create consumer operation,
+                    // since we are not necessarily closing the connection.
+                    long closeRequestId = client.newRequestId();
+                    ByteBuf cmd = Commands.newCloseConsumer(consumerId, 
closeRequestId);
+                    cnx.sendRequestWithId(cmd, closeRequestId);
+                }
+
+                if (e.getCause() instanceof PulsarClientException
+                        && PulsarClientException.isRetriableError(e.getCause())
+                        && System.currentTimeMillis() < 
SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
+                    reconnectLater(e.getCause());
+                } else if (!subscribeFuture.isDone()) {
+                    // unable to create new consumer, fail operation
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    subscribeFuture.completeExceptionally(
+                            PulsarClientException.wrap(e, 
String.format("Failed to subscribe the topic %s "
+                                            + "with subscription name %s when 
connecting to the broker",
+                                    topicName.toString(), subscription)));
+                    client.cleanupConsumer(this);
+                } else if (e.getCause() instanceof TopicDoesNotExistException) 
{
+                    // The topic was deleted after the consumer was created, 
and we're
+                    // not allowed to recreate the topic. This can happen in 
few cases:
+                    //  * Regex consumer getting error after topic gets deleted
+                    //  * Regular consumer after topic is manually delete and 
with
+                    //    auto-topic-creation set to false
+                    // No more retries are needed in this case.
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    client.cleanupConsumer(this);
+                    log.warn("[{}][{}] Closed consumer because topic does not 
exist anymore {}",
+                            topic, subscription, 
cnx.channel().remoteAddress());
+                } else {
+                    // consumer was subscribed and connected but we got some 
error, keep trying
+                    reconnectLater(e.getCause());
+                }
+                return null;
+            });
     }
 
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int 
currentQueueSize) {
diff --git a/site2/docs/developing-binary-protocol.md 
b/site2/docs/developing-binary-protocol.md
index 33861af0da7..85562432263 100644
--- a/site2/docs/developing-binary-protocol.md
+++ b/site2/docs/developing-binary-protocol.md
@@ -279,6 +279,10 @@ subscription is not already there, a new one will be 
created.
 
 ![Consumer](assets/binary-protocol-consumer.png)
 
+If the client does not receive a response indicating consumer creation success 
or failure,
+the client should first send a command to close the original consumer before 
sending a
+command to re-attempt consumer creation.
+
 #### Flow control
 
 After the consumer is ready, the client needs to *give permission* to the
@@ -388,6 +392,11 @@ Parameters:
 
 This command behaves the same as [`CloseProducer`](#command-closeproducer)
 
+If the client does not receive a response to a `Subscribe` command within a 
timeout,
+the client must first send a `CloseConsumer` command before sending another
+`Subscribe` command. The client does not need to await a response to the 
`CloseConsumer`
+command before sending the next `Subscribe` command.
+
 ##### Command RedeliverUnacknowledgedMessages
 
 A consumer can ask the broker to redeliver some or all of the pending messages

Reply via email to