This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1649ef47559da2ec8e67c07917efd4f1f99a9743
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800

    [improve][java-client] Improve performance of multi-topic consumer with 
more than one IO thread (#16336)
    
    (cherry picked from commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93)
---
 .../apache/pulsar/client/impl/MessagesImpl.java    |  6 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       | 60 +++++++++++++---------
 2 files changed, 42 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index cfd77587344..e0a54c50fed 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages;
 @NotThreadSafe
 public class MessagesImpl<T> implements Messages<T> {
 
-    private List<Message<T>> messageList;
+    private final List<Message<T>> messageList;
 
     private final int maxNumberOfMessages;
     private final long maxSizeOfMessages;
@@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> {
         this.messageList.clear();
     }
 
+    List<Message<T>> getMessageList() {
+        return messageList;
+    }
+
     @Override
     public Iterator<Message<T>> iterator() {
         return  messageList.iterator();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 68c5acbc3d0..1108fd66520 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
@@ -242,19 +243,25 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
                 
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
-                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer, true));
             });
         }
     }
 
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAcceptAsync(message -> {
+    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean 
batchReceive) {
+        CompletableFuture<List<Message<T>>> messagesFuture;
+        if (batchReceive) {
+            messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> 
((MessagesImpl<T>) msgs).getMessageList());
+        } else {
+            messagesFuture = 
consumer.receiveAsync().thenApply(Collections::singletonList);
+        }
+        messagesFuture.thenAcceptAsync(messages -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
             }
             // Process the message, add to the queue and trigger listener or 
async callback
-            messageReceived(consumer, message);
+            messages.forEach(msg -> messageReceived(consumer, msg));
 
             int size = incomingMessages.size();
             if (size >= maxReceiverQueueSize
@@ -270,7 +277,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             } else {
                 // Call receiveAsync() if the incoming queue is not full. 
Because this block is run with
                 // thenAcceptAsync, there is no chance for recursion that 
would lead to stack overflow.
-                receiveMessageFromConsumer(consumer);
+                receiveMessageFromConsumer(consumer, messages.size() > 0);
             }
         }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -279,8 +286,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying 
later", consumer, ex);
-            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
-                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, 
TimeUnit.SECONDS);
+            ((ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor())
+                    .schedule(() -> receiveMessageFromConsumer(consumer, 
true), 10, TimeUnit.SECONDS);
             return null;
         });
     }
@@ -323,7 +330,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 }
 
                 internalPinnedExecutor.execute(() -> {
-                    receiveMessageFromConsumer(consumer);
+                    receiveMessageFromConsumer(consumer, true);
                 });
             }
         }
@@ -1021,11 +1028,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
-                                    configurationData, 
client.externalExecutorProvider(),
-                                    partitionIndex, true, listener != null, 
subFuture,
-                                    startMessageId, schema, interceptors,
-                                    createIfDoesNotExist, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, 
createIfDoesNotExist, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
@@ -1048,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
                     return existingValue;
                 } else {
-                    ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
-                            client.externalExecutorProvider(), -1,
-                            true, listener != null, subFuture, startMessageId, 
schema, interceptors,
-                            createIfDoesNotExist, 
startMessageRollbackDurationInSec);
-
+                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
+                            -1, subFuture, createIfDoesNotExist, schema);
                     synchronized (pauseMutex) {
                         if (paused) {
                             newConsumer.pause();
@@ -1092,6 +1093,22 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             });
     }
 
+    private ConsumerImpl<T> 
createInternalConsumer(ConsumerConfigurationData<T> configurationData, String 
partitionName,
+                                                   int partitionIndex, 
CompletableFuture<Consumer<T>> subFuture,
+                                                   boolean 
createIfDoesNotExist, Schema<T> schema) {
+        BatchReceivePolicy internalBatchReceivePolicy = 
BatchReceivePolicy.builder()
+                
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+                .maxNumBytes(-1)
+                .timeout(1, TimeUnit.MILLISECONDS)
+                .build();
+        configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        return ConsumerImpl.newConsumerImpl(client, partitionName,
+                configurationData, client.externalExecutorProvider(),
+                partitionIndex, true, listener != null, subFuture,
+                startMessageId, schema, interceptors,
+                createIfDoesNotExist, startMessageRollbackDurationInSec);
+    }
+
     // handling failure during subscribe new topic, unsubscribe success 
created partitions
     private void handleSubscribeOneTopicError(String topicName,
                                               Throwable error,
@@ -1348,11 +1365,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
-                                client, partitionName, configurationData,
-                                client.externalExecutorProvider(),
-                                partitionIndex, true, listener != null, 
subFuture, startMessageId, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, true, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();

Reply via email to