cbornet opened a new issue #12281:
URL: https://github.com/apache/pulsar/issues/12281


   **Describe the bug**
   When a consumer is closing/closed and `redeliverUnacknowledgedMessages()` or 
`redeliverUnacknowledgedMessages(Set<MessageId> messageIds)` is called, then 
the code closes the client connection. This created unexpected behaviours for 
the other consumers. See the sample code below for instance
   
   **To Reproduce**
   Steps to reproduce the behavior:
   ```java
     @Test
     void simpletest() throws Exception {
       PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(cluster.getAddress()).build();
   
       ConsumerBuilder<String> consumerBuilder = pulsarClient
           .newConsumer(Schema.STRING)
           .topic("test")
           .subscriptionName("test")
           .subscriptionType(SubscriptionType.Shared)
           .negativeAckRedeliveryDelay(0, TimeUnit.SECONDS);
   
       Consumer<String> consumer1 = consumerBuilder.subscribe();
       Consumer<String> consumer2 = consumerBuilder.subscribe();
   
       Producer<String> producer =
           pulsarClient.newProducer(Schema.STRING).topic("test").create();
   
       producer.send("hello");
       producer.send("world");
   
       Message<String> receive1 = consumer1.receive();
       Message<String> receive2 = consumer2.receive();
   
       consumer1.negativeAcknowledge(receive1);
       consumer1.close();
   
       Thread.sleep(5000);
   
       Message<String> receive3 = consumer2.receive(5, TimeUnit.SECONDS);
       Message<String> receive4 = consumer2.receive(5, TimeUnit.SECONDS);
   
       assertEquals("hello", receive3.getValue());
       assertNull(receive4);
     }
   ```
   
   **Expected behavior**
   This test should pass but it doesn't because the `pulsarClient` is 
reconnected, so `consumer2` is also reconnected and its unacked messages are 
redelivered. So `receive4` contains `world` instead of being null.
   
   **Additional context**
   The code that closes the connection is 
[here](https://github.com/apache/pulsar/blob/4147db88bc08eb3b2dab97a45af178fd9f4c7a6b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1610).
 I'm not sure why we need to restart the connection here. If the consumer is 
closing/closed, the broker will deliver the unacked messages to another 
consumer anyway so why is it needed to reconnect completely the client ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to