BewareMyPower edited a comment on pull request #10982:
URL: https://github.com/apache/pulsar/pull/10982#issuecomment-865685162
@newur It's a synchronous vs. asynchronous compare. The synchronous way that
uses `receive` method is easier to use for newbies. I've seen many newbies that
tried to use the asynchronous way that sets the listener while they didn't
realize that they should do something in the main thread.
```java
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.subscriptionName("my-sub")
.topic("my-topic")
.messageListener(new MessageListener<String>() {
@Override
public void received(Consumer<String> consumer,
Message<String> msg) {
System.out.println("Receive " + msg.getValue() +
" from " + msg.getMessageId());
}
})
.subscribe();
// TODO: do something else
// If you don't do anything to block here, the consumer will
close immediately.
consumer.close();
```
Some newbies might think when they set the listener, the consumer will block
until some state. You may think they're stupid, but the "stupid" guys exist.
(Just want to say not everyone is good at the asynchronous programming)
Asynchronous programming is more complicated than synchronous programming.
Given a case that you need to collect 10 messages to a list and then close the
consumer. Using `receive` is easy.
Using listener is somehow complicated:
```java
final CountDownLatch latch = new CountDownLatch(10);
final List<String> messages = Collections.synchronizedList(new
ArrayList<>()); // [1]
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.subscriptionName("my-sub")
.topic("my-topic")
.messageListener(new MessageListener<String>() {
@Override
public void received(Consumer<String> consumer,
Message<String> msg) {
if (messages.size() < 10) { // [2]
messages.add(msg.getValue());
consumer.acknowledgeAsync(msg); // just to
avoid catching the exception
}
latch.countDown();
}
})
.subscribe();
latch.await(); // [3]
messages.forEach(System.out::println); // Users may do something
more complicated here
consumer.close();
```
1. You should use a thread-safe container because you'll access these
messages in both main thread and listener thread.
2. Since you cannot control whether to stop calling the listener, you need
to check the count of received messages in case the consumer received more than
10 messages internally.
3. You need an extra component like `CountDownLatch` for the
synchronization between the main thread and the listener thread.
While using `receive` method is much easier:
```java
final List<String> messages = new ArrayList<>();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("my-sub")
.topic("my-topic")
.subscribe();
for (int i = 0; i < 10; i++) {
// NOTE: If you're using receive() with timeout, you must
check null for timeout
messages.add(consumer.receive().getValue());
}
messages.forEach(System.out::println);
consumer.close();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]