eolivelli opened a new issue #10813:
URL: https://github.com/apache/pulsar/issues/10813
I am investigating a case in which the consumer is stuck while there are
still many messages to read from a partitioned topic.
At high level the test is:
- create a partitioned topic with 100 partitions
- produce 1.000.000 messages
- consume the message
Desired outcome: the consumer is able to fully consume the messages
Actual outcome (current 2.8.0-SNAPSHOT): the consumer is stuck at
approximately 80% of the topic
Steps to reproduce:
- install a brand new Pulsar cluster, 2.8.0-SNAPSHOT (I am using k8s and the
helm chart)
- 3 bookies, replication 3x3x3, 1 broker, 1 proxy
- set retention: `bin/pulsar-admin namespaces set-retention -s 3T -t 3w
public/default`
- create topic: `bin/pulsar-admin topics create-partitioned-topic -p 100
test`
- create the "shared" subscription: `bin/pulsar-perf consume -ss nb -st
Shared test`
- verify stats: `bin/pulsar-admin topics partitioned-stats test`
- produce messages: `bin/pulsar-perf produce -r 10000 -m 1000000 -mk random
test`
- consume the messages using a very simple Java client program (listing
below)
- compile the program:
-- cd "/pulsar"
-- write SimpleConsumeMain.java
-- javac SimpleConsumeMain.java -cp "lib/*"
- run the program `java -cp ".:lib/*" SimpleConsumeMain
pulsar://pulsar-broker:6650 nb test`
The program should consume the subscription, with 1.000.000 messages
```
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
public class SimpleConsumeMain {
public static void main(String ... args) throws Exception {
String broker = args[0];
String subscription = args[1];
String topic = args[2];
PulsarClient client = PulsarClient
.builder()
.serviceUrl(broker)
.build();
Consumer<byte[]> consumer = client.newConsumer()
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Shared)
.topic(topic)
.subscribe();
long count = 0;
while (true) {
Message<byte[]> msg = consumer.receive();
count++;
consumer.acknowledge(msg);
if (count % 1000 == 0) {
System.out.println("Received "+count+" messages");
}
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]