BewareMyPower commented on issue #8846:
URL: https://github.com/apache/pulsar/issues/8846#issuecomment-740629832


   @ltamber The cost of flexibility is the complexity. It's not like 
`sendAsync` vs. `send`, `receive(Message&, int)` is easy and enough to use in 
most cases. The only case to use `receiveAsync` is to wait multiple messages.
   
   If you're using Java client, when you choose `receiveAsync`, the logic will 
still be complicated.
   
   Let's start with a simple client code:
   
   ```java
           try (PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
               Consumer<byte[]> consumer = client.newConsumer()
                       .topic("my-topic")
                       .subscriptionName("my-sub")
                       .subscribe();
               for (int i = 0; i < 100000/* just to represent a large number 
*/; i++) {
                   consumer.receiveAsync().whenComplete((msg, e) -> {
                       if (e == null) {
                           // TODO: process `msg`
                       } else {
                           // TODO: handle error
                       }
                   });
               }
           } catch (PulsarClientException e) {
               e.printStackTrace();
           }
   ```
   
   The issue is the same. If there're no messages, the pending receives queue 
will grow infinitely until OOM.
   
   Then you need to manage these futures, e.g.
   
   ```java
               ArrayBlockingQueue<CompletableFuture<Message<byte[]>>> queue = 
new ArrayBlockingQueue<>(1000);
               for (int i = 0; i < 100000; i++) {
                   CompletableFuture<Message<byte[]>> future = 
consumer.receiveAsync();
                   queue.put(future); // it blocks when the queue is full
                   future.whenComplete((msg, e) -> {
                       if (e == null) {
                           // TODO: process `msg`
                       } else {
                           // TODO: handle error
                       }
                       queue.poll(); // just to release a space
                   });
               }
   ```
   
   So it's like the first C++ example. The code looks more simple just because 
`ArrayBlockingQueue` has already encapsulated the conditional variable. Also,
   
   As for the second C++ example, instead of using `queue.put(future)`:
   
   ```java
                   if (!queue.offer(future)) {
                       // TODO: handle the queue is full error to make sure the 
next queue.add won't fail
                       queue.add(future);
                   }
   ```
   
   You can just wait for a while, or after #8326 , you can call `cancel` here 
to remove all pending receives:
   
   ```java
                   if (!queue.offer(future)) {
                       queue.forEach(futureInQueue -> 
futureInQueue.cancel(true));
                       queue.add(future);
                   }
   ```
   
   The code looks more simple but it's just because Java standard library is 
more strong. You still have to handle the corner cases from a client side 
instead of relying on the library.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to