This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bdda1ebd684 [improve][java-client] Improve performance of multi-topic
consumer with more than one IO thread (#16336)
bdda1ebd684 is described below
commit bdda1ebd6843be6ffd9dff67adcd85ee367f4e93
Author: lipenghui <[email protected]>
AuthorDate: Tue Jul 5 14:31:30 2022 +0800
[improve][java-client] Improve performance of multi-topic consumer with
more than one IO thread (#16336)
---
.../apache/pulsar/client/impl/MessagesImpl.java | 6 ++-
.../client/impl/MultiTopicsConsumerImpl.java | 59 ++++++++++++++--------
2 files changed, 42 insertions(+), 23 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index cfd77587344..e0a54c50fed 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.client.api.Messages;
@NotThreadSafe
public class MessagesImpl<T> implements Messages<T> {
- private List<Message<T>> messageList;
+ private final List<Message<T>> messageList;
private final int maxNumberOfMessages;
private final long maxSizeOfMessages;
@@ -80,6 +80,10 @@ public class MessagesImpl<T> implements Messages<T> {
this.messageList.clear();
}
+ List<Message<T>> getMessageList() {
+ return messageList;
+ }
+
@Override
public Iterator<Message<T>> iterator() {
return messageList.iterator();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index d40fe0b0e43..986a13e446a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -49,6 +49,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
@@ -238,19 +239,25 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(),
consumer.getCurrentReceiverQueueSize());
- internalPinnedExecutor.execute(() ->
receiveMessageFromConsumer(consumer));
+ internalPinnedExecutor.execute(() ->
receiveMessageFromConsumer(consumer, true));
});
}
}
- private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
- consumer.receiveAsync().thenAcceptAsync(message -> {
+ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean
batchReceive) {
+ CompletableFuture<List<Message<T>>> messagesFuture;
+ if (batchReceive) {
+ messagesFuture = consumer.batchReceiveAsync().thenApply(msgs ->
((MessagesImpl<T>) msgs).getMessageList());
+ } else {
+ messagesFuture =
consumer.receiveAsync().thenApply(Collections::singletonList);
+ }
+ messagesFuture.thenAcceptAsync(messages -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
}
// Process the message, add to the queue and trigger listener or
async callback
- messageReceived(consumer, message);
+ messages.forEach(msg -> messageReceived(consumer, msg));
int size = incomingMessages.size();
int maxReceiverQueueSize = getCurrentReceiverQueueSize();
@@ -268,7 +275,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
} else {
// Call receiveAsync() if the incoming queue is not full.
Because this block is run with
// thenAcceptAsync, there is no chance for recursion that
would lead to stack overflow.
- receiveMessageFromConsumer(consumer);
+ receiveMessageFromConsumer(consumer, messages.size() > 0);
}
}, internalPinnedExecutor).exceptionally(ex -> {
if (ex instanceof PulsarClientException.AlreadyClosedException
@@ -277,8 +284,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return null;
}
log.error("Receive operation failed on consumer {} - Retrying
later", consumer, ex);
- ((ScheduledExecutorService) client.getScheduledExecutorProvider())
- .schedule(() -> receiveMessageFromConsumer(consumer), 10,
TimeUnit.SECONDS);
+ ((ScheduledExecutorService)
client.getScheduledExecutorProvider().getExecutor())
+ .schedule(() -> receiveMessageFromConsumer(consumer,
true), 10, TimeUnit.SECONDS);
return null;
});
}
@@ -321,7 +328,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
internalPinnedExecutor.execute(() -> {
- receiveMessageFromConsumer(consumer);
+ receiveMessageFromConsumer(consumer, true);
});
}
}
@@ -1045,11 +1052,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
configurationData.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, partitionName,
- configurationData,
client.externalExecutorProvider(),
- partitionIndex, true, listener != null,
subFuture,
- startMessageId, schema, interceptors,
- createIfDoesNotExist,
startMessageRollbackDurationInSec);
+ ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
+ partitionIndex, subFuture,
createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();
@@ -1075,10 +1079,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return existingValue;
} else {
internalConfig.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
- client.externalExecutorProvider(), -1,
- true, listener != null, subFuture, startMessageId,
schema, interceptors,
- createIfDoesNotExist,
startMessageRollbackDurationInSec);
+ ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
+ -1, subFuture, createIfDoesNotExist, schema);
synchronized (pauseMutex) {
if (paused) {
@@ -1121,6 +1123,22 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
}
+ private ConsumerImpl<T>
createInternalConsumer(ConsumerConfigurationData<T> configurationData, String
partitionName,
+ int partitionIndex,
CompletableFuture<Consumer<T>> subFuture,
+ boolean
createIfDoesNotExist, Schema<T> schema) {
+ BatchReceivePolicy internalBatchReceivePolicy =
BatchReceivePolicy.builder()
+
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
+ .maxNumBytes(-1)
+ .timeout(1, TimeUnit.MILLISECONDS)
+ .build();
+ configurationData.setBatchReceivePolicy(internalBatchReceivePolicy);
+ return ConsumerImpl.newConsumerImpl(client, partitionName,
+ configurationData, client.externalExecutorProvider(),
+ partitionIndex, true, listener != null, subFuture,
+ startMessageId, schema, interceptors,
+ createIfDoesNotExist, startMessageRollbackDurationInSec);
+ }
+
// handling failure during subscribe new topic, unsubscribe success
created partitions
private void handleSubscribeOneTopicError(String topicName,
Throwable error,
@@ -1378,11 +1396,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
configurationData.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(
- client, partitionName, configurationData,
- client.externalExecutorProvider(),
- partitionIndex, true, listener != null,
subFuture, startMessageId, schema, interceptors,
- true /* createTopicIfDoesNotExist */,
startMessageRollbackDurationInSec);
+ ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
+ partitionIndex, subFuture, true, schema);
synchronized (pauseMutex) {
if (paused) {
newConsumer.pause();