kimcs opened a new issue #5073: java-client: Consumer closed after using 
seek(timestamp)
URL: https://github.com/apache/pulsar/issues/5073
 
 
   **Describe the bug**
   Consumer is closed after a seek(timestamp) operation.
   
   **To Reproduce**
   ```java
   package sample;
   
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.admin.PulsarAdminException;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
   import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
   import org.testng.annotations.Test;
   
   import java.util.List;
   
   public class SeekTimestampTest {
   
       static final String topic = 
"public/default/seek-test-timestamp-magic-h509db3";
   
       void deleteTopic(String topic) throws PulsarClientException {
           ClientConfigurationData config = new ClientConfigurationData();
           config.setAuthentication(new AuthenticationDisabled());
           config.setServiceUrl("http://localhost:8080";);
           try (PulsarAdmin admin = new PulsarAdmin("http://localhost:8080";, 
config)) {
               List<String> topics = 
admin.namespaces().getTopics("public/default");
               for (String topicName : topics) {
                   if (topicName.contains(topic)) {
                       admin.topics().delete(topic);
                       break;
                   }
               }
           } catch (PulsarAdminException e) {
               throw new RuntimeException(e);
           }
       }
   
       void produceMessages(PulsarClient client, int n) throws 
PulsarClientException {
           try (Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
               for (int i = 0; i < n; i++) {
                   MessageId messageId = producer.send(new byte[]{(byte) (1 + 
i)});
                   System.out.printf("Produced message: %s%n", messageId);
               }
           }
       }
   
       @Test
       public void thatSeekTimestampDoesNotCloseConsumer() throws 
PulsarClientException, InterruptedException {
           deleteTopic(topic);
   
           try (PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               try (Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe()) {
                   try {
                       long someTimestamp = System.currentTimeMillis();
                       produceMessages(client, 10);
                       consumer.seek(someTimestamp);
                   } finally {
                       consumer.unsubscribe(); // FAILS with "Not connected to 
broker"
                   }
               }
           }
       }
   
   }
   ```
   
   **Expected behavior**
   That seek(timestamp) should never close the consumer.
   
   **Additional context**
   In attempting to reproduce this bug I have also seen the consumer fail 
before calling unsubscribe on it, e.g. when calling receive on it, but that was 
harder to reproduce consistently.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to