This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1ac258746e5 [improve][java-client] Improve performance of multi-topic 
consumer with more than one IO thread (#16336)
1ac258746e5 is described below

commit 1ac258746e542283626f7b4c7a02f0e1aad95abe
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800

    [improve][java-client] Improve performance of multi-topic consumer with 
more than one IO thread (#16336)
    
    (cherry picked from commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93)
---
 .../apache/pulsar/client/impl/MessagesImpl.java    |  6 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       | 59 ++++++++++++++--------
 2 files changed, 42 insertions(+), 23 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index bb0335b1f15..532d152a831 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -30,7 +30,7 @@ import java.util.List;
 @NotThreadSafe
 public class MessagesImpl<T> implements Messages<T> {
 
-    private List<Message<T>> messageList;
+    private final List<Message<T>> messageList;
 
     private final int maxNumberOfMessages;
     private final long maxSizeOfMessages;
@@ -81,6 +81,10 @@ public class MessagesImpl<T> implements Messages<T> {
         this.messageList.clear();
     }
 
+    List<Message<T>> getMessageList() {
+        return messageList;
+    }
+
     @Override
     public Iterator<Message<T>> iterator() {
         return  messageList.iterator();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 057eda176bd..e5cc32af8fe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
@@ -239,13 +240,19 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
                 
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
-                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> 
receiveMessageFromConsumer(consumer, true));
             });
         }
     }
 
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAcceptAsync(message -> {
+    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean 
batchReceive) {
+        CompletableFuture<List<Message<T>>> messagesFuture;
+        if (batchReceive) {
+            messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> 
((MessagesImpl<T>) msgs).getMessageList());
+        } else {
+            messagesFuture = 
consumer.receiveAsync().thenApply(Collections::singletonList);
+        }
+        messagesFuture.thenAcceptAsync(messages -> {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Receive message from sub consumer:{}",
                     topic, subscription, consumer.getTopic());
@@ -255,7 +262,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return;
             }
             // Process the message, add to the queue and trigger listener or 
async callback
-            messageReceived(consumer, message);
+            messages.forEach(msg -> messageReceived(consumer, msg));
 
             int size = incomingMessages.size();
             if (size >= maxReceiverQueueSize
@@ -271,7 +278,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             } else {
                 // Call receiveAsync() if the incoming queue is not full. 
Because this block is run with
                 // thenAcceptAsync, there is no chance for recursion that 
would lead to stack overflow.
-                receiveMessageFromConsumer(consumer);
+                receiveMessageFromConsumer(consumer, messages.size() > 0);
             }
         }, internalPinnedExecutor).exceptionally(ex -> {
             if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -280,8 +287,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying 
later", consumer, ex);
-            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
-                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, 
TimeUnit.SECONDS);
+            ((ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor())
+                    .schedule(() -> receiveMessageFromConsumer(consumer, 
true), 10, TimeUnit.SECONDS);
             return null;
         });
     }
@@ -324,7 +331,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 }
 
                 internalPinnedExecutor.execute(() -> {
-                    receiveMessageFromConsumer(consumer);
+                    receiveMessageFromConsumer(consumer, true);
                 });
             }
         }
@@ -979,11 +986,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
-                                    configurationData, 
client.externalExecutorProvider(),
-                                    partitionIndex, true, listener != null, 
subFuture,
-                                    startMessageId, schema, interceptors,
-                                    createIfDoesNotExist, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, 
createIfDoesNotExist, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();
@@ -1009,10 +1013,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     return existingValue;
                 } else {
                     internalConfig.setStartPaused(paused);
-                    ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
-                            client.externalExecutorProvider(), -1,
-                            true, listener != null, subFuture, startMessageId, 
schema, interceptors,
-                            createIfDoesNotExist, 
startMessageRollbackDurationInSec);
+                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
+                            -1, subFuture, createIfDoesNotExist, schema);
 
                     synchronized (pauseMutex) {
                         if (paused) {
@@ -1054,6 +1056,22 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             });
     }
 
+    private ConsumerImpl<T> 
createInternalConsumer(ConsumerConfigurationData<T> configurationData, String 
partitionName,
+                                                   int partitionIndex, 
CompletableFuture<Consumer<T>> subFuture,
+                                                   boolean 
createIfDoesNotExist, Schema<T> schema) {
+        BatchReceivePolicy internalBatchReceivePolicy = 
BatchReceivePolicy.builder()
+                
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+                .maxNumBytes(-1)
+                .timeout(1, TimeUnit.MILLISECONDS)
+                .build();
+        configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+        return ConsumerImpl.newConsumerImpl(client, partitionName,
+                configurationData, client.externalExecutorProvider(),
+                partitionIndex, true, listener != null, subFuture,
+                startMessageId, schema, interceptors,
+                createIfDoesNotExist, startMessageRollbackDurationInSec);
+    }
+
     // handling failure during subscribe new topic, unsubscribe success 
created partitions
     private void handleSubscribeOneTopicError(String topicName, Throwable 
error, CompletableFuture<Void> subscribeFuture) {
         log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
{}", topic, topicName, error.getMessage());
@@ -1311,11 +1329,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
                         configurationData.setStartPaused(paused);
-                        ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
-                                client, partitionName, configurationData,
-                                client.externalExecutorProvider(),
-                                partitionIndex, true, listener != null, 
subFuture, startMessageId, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */, 
startMessageRollbackDurationInSec);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
+                                partitionIndex, subFuture, true, schema);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();

Reply via email to