This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bdda1ebd684 [improve][java-client] Improve performance of multi-topic 
consumer with more than one IO thread (#16336)
bdda1ebd684 is described below

commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800

    [improve][java-client] Improve performance of multi-topic consumer with 
more than one IO thread (#16336)
---
 .../apache/pulsar/client/impl/MessagesImpl.java    |  6 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       | 59 ++++++++++++++--------
 2 files changed, 42 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index cfd77587344..e0a54c50fed 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages;
 @NotThreadSafe
 public class MessagesImpl<T> implements Messages<T> {
 
-    private List<Message<T>> messageList;
+    private final List<Message<T>> messageList;
 
     private final int maxNumberOfMessages;
     private final long maxSizeOfMessages;
@@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> {
         this.messageList.clear();
     }
 
+    List<Message<T>> getMessageList() {
+        return messageList;
+    }
+
     @Override
     public Iterator<Message<T>> iterator() {
         return  messageList.iterator();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d40fe0b0e43..986a13e446a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
@@ -238,19 +239,25 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             newConsumers.forEach(consumer -> {
                 
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
                         consumer.getCurrentReceiverQueueSize());
-                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer, true));
             });
         }
     }
 
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAcceptAsync(message -> {
+    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean 
batchReceive) {
+        CompletableFuture<List<Message<T>>> messagesFuture;
+        if (batchReceive) {
+            messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> 
((MessagesImpl<T>) msgs).getMessageList());
+        } else {
+            messagesFuture = 
consumer.receiveAsync().thenApply(Collections::singletonList);
+        }
+        messagesFuture.thenAcceptAsync(messages -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
             }
             // Process the message, add to the queue and trigger listener or 
async callback
-            messageReceived(consumer, message);
+            messages.forEach(msg -> messageReceived(consumer, msg));
 
             int size = incomingMessages.size();
             int maxReceiverQueueSize = getCurrentReceiverQueueSize();
@@ -268,7 +275,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             } else {
                 // Call receiveAsync() if the incoming queue is not full. 
Because this block is run with
                 // thenAcceptAsync, there is no chance for recursion that 
would lead to stack overflow.
-                receiveMessageFromConsumer(consumer);
+                receiveMessageFromConsumer(consumer, messages.size() > 0);
             }
         }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -277,8 +284,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying 
later", consumer, ex);
-            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
-                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, 
TimeUnit.SECONDS);
+            ((ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor())
+                    .schedule(() -> receiveMessageFromConsumer(consumer, 
true), 10, TimeUnit.SECONDS);
             return null;
         });
     }
@@ -321,7 +328,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 }
 
                 internalPinnedExecutor.execute(() -> {
-                    receiveMessageFromConsumer(consumer);
+                    receiveMessageFromConsumer(consumer, true);
                 });
             }
         }
@@ -1045,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
-                                    configurationData, 
client.externalExecutorProvider(),
-                                    partitionIndex, true, listener != null, 
subFuture,
-                                    startMessageId, schema, interceptors,
-                                    createIfDoesNotExist, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, 
createIfDoesNotExist, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
@@ -1075,10 +1079,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     return existingValue;
                 } else {
                     internalConfig.setStartPaused(paused);
-                    ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
-                            client.externalExecutorProvider(), -1,
-                            true, listener != null, subFuture, startMessageId, 
schema, interceptors,
-                            createIfDoesNotExist, 
startMessageRollbackDurationInSec);
+                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
+                            -1, subFuture, createIfDoesNotExist, schema);
 
                     synchronized (pauseMutex) {
                         if (paused) {
@@ -1121,6 +1123,22 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             });
     }
 
+    private ConsumerImpl<T> 
createInternalConsumer(ConsumerConfigurationData<T> configurationData, String 
partitionName,
+                                                   int partitionIndex, 
CompletableFuture<Consumer<T>> subFuture,
+                                                   boolean 
createIfDoesNotExist, Schema<T> schema) {
+        BatchReceivePolicy internalBatchReceivePolicy = 
BatchReceivePolicy.builder()
+                
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+                .maxNumBytes(-1)
+                .timeout(1, TimeUnit.MILLISECONDS)
+                .build();
+        configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        return ConsumerImpl.newConsumerImpl(client, partitionName,
+                configurationData, client.externalExecutorProvider(),
+                partitionIndex, true, listener != null, subFuture,
+                startMessageId, schema, interceptors,
+                createIfDoesNotExist, startMessageRollbackDurationInSec);
+    }
+
     // handling failure during subscribe new topic, unsubscribe success 
created partitions
     private void handleSubscribeOneTopicError(String topicName,
                                               Throwable error,
@@ -1378,11 +1396,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
                         configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
-                                client, partitionName, configurationData,
-                                client.externalExecutorProvider(),
-                                partitionIndex, true, listener != null, 
subFuture, startMessageId, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, true, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();

Reply via email to