codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker URL: https://github.com/apache/pulsar/issues/3131#issuecomment-452700099 This is a test case to show how consumer stop consume messages: ```java package com.zhaopin.pulsar.issues; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class ConsumerStopReceiveMessages { public static void main(String[] args) throws PulsarClientException { final String topic = "your-topic"; /** * Broker configs: * * maxUnackedMessagesPerConsumer=500 * maxUnackedMessagesPerSubscription=2000 */ PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); // Create a producer with enable message batching Producer<byte[]> producer = client.newProducer() .topic(topic) .batchingMaxMessages(1000) .batchingMaxPublishDelay(30, TimeUnit.SECONDS) .blockIfQueueFull(true) .maxPendingMessages(1000) .enableBatching(true) .create(); // Create 6 consumers List<Consumer<byte[]>> consumers = new ArrayList<>(); for (int i = 0; i < 6; i++) { consumers.add(client.newConsumer() .topic(topic) .subscriptionType(SubscriptionType.Shared) .subscriptionName("test") .ackTimeout(1, TimeUnit.SECONDS) .receiverQueueSize(100) .subscribe()); } // Producer start publish messages new Thread(() -> { int index = 0; for (; ; ) { producer.sendAsync((index++ + "").getBytes()); } }).start(); // Consumers start consume messages consumers.forEach(consumer -> new Thread(() -> { do { // can't receive message try { Message<byte[]> msg = consumer.receive(); System.out.println("Message Received [x] " + consumer.getConsumerName() + " --- [" + msg.getMessageId() + "]" + " ---- " + new String(msg.getValue())); //Do not ack messages, wait broker redelivery } catch (Exception e) { e.printStackTrace(); } } while (true); }).start()); } } ``` After a while, you will get following logs: ``` Message Received [x] 78cab --- [15361:48:-1:990] ---- 0 Message Received [x] 78cab --- [15361:48:-1:991] ---- 0 Message Received [x] 78cab --- [15361:48:-1:992] ---- 0 Message Received [x] 78cab --- [15361:48:-1:993] ---- 0 Message Received [x] 78cab --- [15361:48:-1:994] ---- 0 Message Received [x] 78cab --- [15361:48:-1:995] ---- 0 Message Received [x] 78cab --- [15361:48:-1:996] ---- 0 Message Received [x] 78cab --- [15361:48:-1:997] ---- 0 Message Received [x] 78cab --- [15361:48:-1:998] ---- 0 Message Received [x] 78cab --- [15361:48:-1:999] ---- 0 [ConsumerBase{subscription='test', consumerName='78cab', topic='your-topic'}] 9 messages have timed-out [your-topic] [pulsar-api-test-14-3291] Pending messages: 1 --- Publish throughput: 7583.21 msg/s --- 0.33 Mbit/s --- Latency: med: 222.155 ms - 95pct: 587.127 ms - 99pct: 666.942 ms - 99.9pct: 766.028 ms - max: 766.102 ms --- Ack received rate: 7566.54 ack/s --- Failed messages: 0 [your-topic] [test] [2a5ea] Prefetched messages: 0 --- Consume throughput: 16.45 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {} [your-topic] [test] [78cab] Prefetched messages: 0 --- Consume throughput: 332.48 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {} ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services