eolivelli opened a new issue #10813:
URL: https://github.com/apache/pulsar/issues/10813


   I am investigating a case in which the consumer is stuck while there are 
still many messages to read from a partitioned topic.
   
   At high level the test is:
   - create a partitioned topic with 100 partitions
   - produce 1.000.000 messages
   - consume the message
   
   Desired outcome: the consumer is able to fully consume the messages 
   Actual outcome (current 2.8.0-SNAPSHOT): the consumer is stuck at 
approximately 80% of the topic
   
   Steps to reproduce:
   - install a brand new Pulsar cluster, 2.8.0-SNAPSHOT (I am using k8s and the 
helm chart)
   - 3 bookies, replication 3x3x3, 1 broker, 1 proxy
   - set retention: `bin/pulsar-admin namespaces set-retention -s 3T -t 3w 
public/default`
   - create topic: `bin/pulsar-admin topics create-partitioned-topic -p 100 
test`
   - create the "shared" subscription: `bin/pulsar-perf consume -ss nb -st 
Shared test`
   - verify stats: `bin/pulsar-admin topics partitioned-stats test`
   - produce messages: `bin/pulsar-perf produce -r 10000 -m 1000000 -mk random 
test`
   - consume the messages using a very simple Java client program (listing 
below)
   - compile the program: 
   -- cd "/pulsar"
   -- write SimpleConsumeMain.java
   -- javac SimpleConsumeMain.java  -cp "lib/*"
   - run the program `java -cp ".:lib/*" SimpleConsumeMain 
pulsar://pulsar-broker:6650 nb test`
   
   The program should consume the subscription, with 1.000.000 messages
    
   
   ```
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   public class SimpleConsumeMain {
       public static void main(String ... args) throws Exception {
           String broker = args[0];
           String subscription = args[1];
           String topic = args[2];
           PulsarClient client = PulsarClient
                   .builder()
                   .serviceUrl(broker)
                   .build();
           Consumer<byte[]> consumer = client.newConsumer()
                 .subscriptionName(subscription)
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscriptionType(SubscriptionType.Shared)
                 .topic(topic)
                 .subscribe();
           long count = 0;
           while (true) {
               Message<byte[]> msg = consumer.receive();
               count++;
               consumer.acknowledge(msg);
               if (count % 1000 == 0) {
                   System.out.println("Received "+count+" messages");
               }
           }
       }
   }
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to