BewareMyPower edited a comment on pull request #10982:
URL: https://github.com/apache/pulsar/pull/10982#issuecomment-865685162


   @newur It's a synchronous vs. asynchronous compare. The synchronous way that 
uses `receive` method is easier to use for newbies. I've seen many newbies that 
tried to use the asynchronous way that sets the listener while they didn't 
realize that they should do something in the main thread.
   
   ```java
               Consumer<String> consumer = client.newConsumer(Schema.STRING)
                       .subscriptionName("my-sub")
                       .topic("my-topic")
                       .messageListener(new MessageListener<String>() {
                           @Override
                           public void received(Consumer<String> consumer, 
Message<String> msg) {
                               System.out.println("Receive " + msg.getValue() + 
" from " + msg.getMessageId());
                           }
                       })
                       .subscribe();
               // TODO: do something else
               // If you don't do anything to block here, the consumer will 
close immediately.
               consumer.close();
   ```
   
   Some newbies might think when they set the listener, the consumer will block 
until some state. You may think they're stupid, but the "stupid" guys exist. 
(Just want to say not everyone is good at the asynchronous programming)
   
   Asynchronous programming is more complicated than synchronous programming. 
Given a case that you need to collect 10 messages to a list and then close the 
consumer. Using `receive` is easy.
   
   Using listener is somehow complicated:
   
   ```java
               final CountDownLatch latch = new CountDownLatch(10);
               final List<String> messages = Collections.synchronizedList(new 
ArrayList<>()); // [1]
               Consumer<String> consumer = client.newConsumer(Schema.STRING)
                       .subscriptionName("my-sub")
                       .topic("my-topic")
                       .messageListener(new MessageListener<String>() {
                           @Override
                           public void received(Consumer<String> consumer, 
Message<String> msg) {
                               if (messages.size() < 10) { // [2]
                                   messages.add(msg.getValue());
                                   consumer.acknowledgeAsync(msg); // just to 
avoid catching the exception
                               }
                               latch.countDown();
                           }
                       })
                       .subscribe();
               latch.await(); // [3]
               messages.forEach(System.out::println); // Users may do something 
more complicated here
               consumer.close();
   ```
   
   1. You should use a thread-safe container because you'll access these 
messages in both main thread and listener thread.
   2. Since you cannot control whether to stop calling the listener, you need 
to check the count of received messages in case the consumer received more than 
10 messages internally.
   3. You need an extra component like `CountDownLatch`  for the 
synchronization between the main thread and the listener thread.
   
   While using `receive` method is much easier:
   
   ```java
               final List<String> messages = new ArrayList<>();
               Consumer<String> consumer = client.newConsumer(Schema.STRING)
                       
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                       .subscriptionName("my-sub")
                       .topic("my-topic")
                       .subscribe();
               for (int i = 0; i < 10; i++) {
                   // NOTE: If you're using receive() with timeout, you must 
check null for timeout
                   messages.add(consumer.receive().getValue());
               }
               messages.forEach(System.out::println);
               consumer.close();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to