codelipenghui edited a comment on issue #3131: Consumer stop receive messages 
from broker
URL: https://github.com/apache/pulsar/issues/3131#issuecomment-452700099
 
 
   This is a test case to show how consumer stop consume messages:
   
   ```java
   package com.zhaopin.pulsar.issues;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.TimeUnit;
   
   public class ConsumerStopReceiveMessages {
   
       public static void main(String[] args) throws PulsarClientException {
   
           final String topic = "your-topic";
   
           /**
            * Broker configs:
            *
            * maxUnackedMessagesPerConsumer=500
            * maxUnackedMessagesPerSubscription=2000
            */
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("pulsar://localhost:6650")
                   .build();
   
           // Create a producer with enable message batching
           Producer<byte[]> producer = client.newProducer()
                   .topic(topic)
                   .batchingMaxMessages(1000)
                   .batchingMaxPublishDelay(30, TimeUnit.SECONDS)
                   .blockIfQueueFull(true)
                   .maxPendingMessages(1000)
                   .enableBatching(true)
                   .create();
   
           // Create 6 consumers
           List<Consumer<byte[]>> consumers = new ArrayList<>();
           for (int i = 0; i < 6; i++) {
               consumers.add(client.newConsumer()
                       .topic(topic)
                       .subscriptionType(SubscriptionType.Shared)
                       .subscriptionName("test")
                       .ackTimeout(1, TimeUnit.SECONDS)
                       .receiverQueueSize(100)
                       .subscribe());
           }
   
           // Producer start publish messages
           new Thread(() -> {
               int index = 0;
               for (; ; ) {
                   producer.sendAsync((index++ + "").getBytes());
               }
           }).start();
   
           // Consumers start consume messages
           consumers.forEach(consumer -> new Thread(() -> {
               do {
                   // can't receive message
                   try {
                       Message<byte[]> msg = consumer.receive();
                       System.out.println("Message Received [x] " + 
consumer.getConsumerName() + " --- [" + msg.getMessageId() + "]" + " ---- " + 
new String(msg.getValue()));
                       //Do not ack messages, wait broker redelivery
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               } while (true);
           }).start());
       }
   }
   ```
   
   After a while, you will get following logs:
   
   ```
   Message Received [x] 78cab --- [15361:48:-1:990] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:991] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:992] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:993] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:994] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:995] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:996] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:997] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:998] ---- 0
   Message Received [x] 78cab --- [15361:48:-1:999] ---- 0
   [ConsumerBase{subscription='test', consumerName='78cab', 
topic='your-topic'}] 9 messages have timed-out
   [your-topic] [pulsar-api-test-14-3291] Pending messages: 1 --- Publish 
throughput: 7583.21 msg/s --- 0.33 Mbit/s --- Latency: med: 222.155 ms - 95pct: 
587.127 ms - 99pct: 666.942 ms - 99.9pct: 766.028 ms - max: 766.102 ms --- Ack 
received rate: 7566.54 ack/s --- Failed messages: 0
   [your-topic] [test] [2a5ea] Prefetched messages: 0 --- Consume throughput: 
16.45 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent 
rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {}
   [your-topic] [test] [78cab] Prefetched messages: 0 --- Consume throughput: 
332.48 msgs/s --- Throughput received: 0.00 msg/s --- 0.00 Mbit/s --- Ack sent 
rate: 0 ack/s --- Failed messages: 0 --- Failed acks: {}
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to