This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1649ef47559da2ec8e67c07917efd4f1f99a9743 Author: lipenghui <[email protected]> AuthorDate: Tue Jul 5 14:31:30 2022 +0800 [improve][java-client] Improve performance of multi-topic consumer with more than one IO thread (#16336) (cherry picked from commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93) --- .../apache/pulsar/client/impl/MessagesImpl.java | 6 ++- .../client/impl/MultiTopicsConsumerImpl.java | 60 +++++++++++++--------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index cfd77587344..e0a54c50fed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages; @NotThreadSafe public class MessagesImpl<T> implements Messages<T> { - private List<Message<T>> messageList; + private final List<Message<T>> messageList; private final int maxNumberOfMessages; private final long maxSizeOfMessages; @@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> { this.messageList.clear(); } + List<Message<T>> getMessageList() { + return messageList; + } + @Override public Iterator<Message<T>> iterator() { return messageList.iterator(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 68c5acbc3d0..1108fd66520 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -49,6 +49,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; @@ -242,19 +243,25 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { if (getState() == State.Ready) { newConsumers.forEach(consumer -> { consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); - internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); + internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true)); }); } } - private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { - consumer.receiveAsync().thenAcceptAsync(message -> { + private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) { + CompletableFuture<List<Message<T>>> messagesFuture; + if (batchReceive) { + messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList()); + } else { + messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList); + } + messagesFuture.thenAcceptAsync(messages -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic()); } // Process the message, add to the queue and trigger listener or async callback - messageReceived(consumer, message); + messages.forEach(msg -> messageReceived(consumer, msg)); int size = incomingMessages.size(); if (size >= maxReceiverQueueSize @@ -270,7 +277,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } else { // Call receiveAsync() if the incoming queue is not full. Because this block is run with // thenAcceptAsync, there is no chance for recursion that would lead to stack overflow. - receiveMessageFromConsumer(consumer); + receiveMessageFromConsumer(consumer, messages.size() > 0); } }, internalPinnedExecutor).exceptionally(ex -> { if (ex instanceof PulsarClientException.AlreadyClosedException @@ -279,8 +286,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return null; } log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex); - ((ScheduledExecutorService) client.getScheduledExecutorProvider()) - .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS); + ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()) + .schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS); return null; }); } @@ -323,7 +330,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } internalPinnedExecutor.execute(() -> { - receiveMessageFromConsumer(consumer); + receiveMessageFromConsumer(consumer, true); }); } } @@ -1021,11 +1028,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { partitionIndex -> { String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName, - configurationData, client.externalExecutorProvider(), - partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, - createIfDoesNotExist, startMessageRollbackDurationInSec); + ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName, + partitionIndex, subFuture, createIfDoesNotExist, schema); synchronized (pauseMutex) { if (paused) { newConsumer.pause(); @@ -1048,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); return existingValue; } else { - ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, - client.externalExecutorProvider(), -1, - true, listener != null, subFuture, startMessageId, schema, interceptors, - createIfDoesNotExist, startMessageRollbackDurationInSec); - + ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName, + -1, subFuture, createIfDoesNotExist, schema); synchronized (pauseMutex) { if (paused) { newConsumer.pause(); @@ -1092,6 +1093,22 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { }); } + private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> configurationData, String partitionName, + int partitionIndex, CompletableFuture<Consumer<T>> subFuture, + boolean createIfDoesNotExist, Schema<T> schema) { + BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder() + .maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1)) + .maxNumBytes(-1) + .timeout(1, TimeUnit.MILLISECONDS) + .build(); + configurationData.setBatchReceivePolicy(internalBatchReceivePolicy); + return ConsumerImpl.newConsumerImpl(client, partitionName, + configurationData, client.externalExecutorProvider(), + partitionIndex, true, listener != null, subFuture, + startMessageId, schema, interceptors, + createIfDoesNotExist, startMessageRollbackDurationInSec); + } + // handling failure during subscribe new topic, unsubscribe success created partitions private void handleSubscribeOneTopicError(String topicName, Throwable error, @@ -1348,11 +1365,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { int partitionIndex = TopicName.getPartitionIndex(partitionName); CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig(); - ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl( - client, partitionName, configurationData, - client.externalExecutorProvider(), - partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors, - true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec); + ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName, + partitionIndex, subFuture, true, schema); synchronized (pauseMutex) { if (paused) { newConsumer.pause();
