This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 10f4e0248f0 [improve][client]PIP-359:Support custom message listener
executor for specific subscription (#22861)
10f4e0248f0 is described below
commit 10f4e0248f0f985b1dc7ad38970c906b7fe629be
Author: Aurora Twinkle <[email protected]>
AuthorDate: Mon Aug 5 19:25:17 2024 +0800
[improve][client]PIP-359:Support custom message listener executor for
specific subscription (#22861)
Co-authored-by: duanlinlin <[email protected]>
[PIP-359](https://github.com/apache/pulsar/pull/22902)
Support custom message listener thread pool for specific subscription,
avoid individual subscription listener consuming too much time leading to
higher consumption delay in other subscriptions.
<!--
### Contribution Checklist
- PR title format should be *[type][component] summary*. For details, see
*[Guideline - Pulsar PR Naming
Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.
- Fill out the template below to describe the changes contributed by the
pull request. That will give reviewers the context they need to do the review.
- Each pull request should address only one issue, not mix up code from
multiple issues.
- Each commit in the pull request has a meaningful commit message
- Once all items of the checklist are addressed, remove the above text
and this checklist, leaving only the filled out template below.
-->
<!-- Either this PR fixes an issue, -->
<!-- or this PR is one task of an issue -->
<!-- If the PR belongs to a PIP, please add the PIP link here -->
<!-- Details of when a PIP is required and how the PIP process work, please
see: https://github.com/apache/pulsar/blob/master/pip/README.md -->
### Motivation
In our scenario, there is a centralized message proxy service, this service
will use the same PulsarClient instance to create a lot of subscription groups
to consume many topics and cache messages locally.Then the business will pull
messages from the cache of the proxy service. It seems that there is no
problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very
high, it almost affects all consumer groups responsible for the proxy service,
causing a large number of message delays.
By analyzing the source code, we found that by default, all consumer
instances created from the same PulsarClient will share a thread pool to
process message listeners, and sometimes there are multiple consumer message
listeners bound to the same thread. Obviously, when a consumer processes
messages and causes long-term blocking, it will cause the messages of other
consumers bound to the thread to fail to be processed in time, resulting in
message delays. Therefore, for this scenario, [...]
<!-- Explain here the context, and why you're making that change. What is
the problem you're trying to solve. -->
### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->
---
.../client/api/MessageListenerExecutorTest.java | 193 +++++++++++++++++++++
.../apache/pulsar/client/api/ConsumerBuilder.java | 15 ++
.../pulsar/client/api/MessageListenerExecutor.java | 43 +++++
.../apache/pulsar/client/impl/ConsumerBase.java | 28 +--
.../pulsar/client/impl/ConsumerBuilderImpl.java | 8 +
.../impl/conf/ConsumerConfigurationData.java | 3 +
6 files changed, 280 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
new file mode 100644
index 00000000000..9e148beb304
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageListenerExecutorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertTrue;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class MessageListenerExecutorTest extends ProducerConsumerBase {
+ private static final Logger log =
LoggerFactory.getLogger(MessageListenerExecutorTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ // Set listenerThreads to 1 to reproduce the pr more easily in #22861
+ clientBuilder.listenerThreads(1);
+ }
+
+ @Test
+ public void testConsumerMessageListenerExecutorIsolation() throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<CompletableFuture<Long>>
maxConsumeDelayWithDisableIsolationFutures = new ArrayList<>();
+ int loops = 5;
+ long consumeSleepTimeMs = 10000;
+ for (int i = 0; i < loops; i++) {
+ // The first consumer will consume messages with sleep block 1s,
+ // and the others will consume messages without sleep block.
+ // The maxConsumeDelayWithDisableIsolation of all consumers
+ // should be greater than sleepTimeMs cause by disable
MessageListenerExecutor.
+ CompletableFuture<Long> maxConsumeDelayFuture =
startConsumeAndComputeMaxConsumeDelay(
+
"persistent://my-property/my-ns/testConsumerMessageListenerDisableIsolation-" +
i,
+ "my-sub-testConsumerMessageListenerDisableIsolation-" + i,
+ i == 0 ? Duration.ofMillis(consumeSleepTimeMs) :
Duration.ofMillis(0),
+ false,
+ executor);
+
maxConsumeDelayWithDisableIsolationFutures.add(maxConsumeDelayFuture);
+ }
+
+ // ensure all consumers consume messages delay more than
consumeSleepTimeMs
+ boolean allDelayMoreThanConsumeSleepTimeMs =
maxConsumeDelayWithDisableIsolationFutures.stream()
+ .map(CompletableFuture::join)
+ .allMatch(delay -> delay > consumeSleepTimeMs);
+ assertTrue(allDelayMoreThanConsumeSleepTimeMs);
+
+ List<CompletableFuture<Long>>
maxConsumeDelayWhitEnableIsolationFutures = new ArrayList<>();
+ for (int i = 0; i < loops; i++) {
+ // The first consumer will consume messages with sleep block 1s,
+ // and the others will consume messages without sleep block.
+ // The maxConsumeDelayWhitEnableIsolation of the first consumer
+ // should be greater than sleepTimeMs, and the others should be
+ // less than sleepTimeMs, cause by enable MessageListenerExecutor.
+ CompletableFuture<Long> maxConsumeDelayFuture =
startConsumeAndComputeMaxConsumeDelay(
+
"persistent://my-property/my-ns/testConsumerMessageListenerEnableIsolation-" +
i,
+ "my-sub-testConsumerMessageListenerEnableIsolation-" + i,
+ i == 0 ? Duration.ofMillis(consumeSleepTimeMs) :
Duration.ofMillis(0),
+ true,
+ executor);
+
maxConsumeDelayWhitEnableIsolationFutures.add(maxConsumeDelayFuture);
+ }
+
+ assertTrue(maxConsumeDelayWhitEnableIsolationFutures.get(0).join() >
consumeSleepTimeMs);
+ boolean remainingAlmostNoDelay =
maxConsumeDelayWhitEnableIsolationFutures.stream()
+ .skip(1)
+ .map(CompletableFuture::join)
+ .allMatch(delay -> delay < 1000);
+ assertTrue(remainingAlmostNoDelay);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private CompletableFuture<Long>
startConsumeAndComputeMaxConsumeDelay(String topic, String subscriptionName,
+
Duration consumeSleepTime,
+
boolean enableMessageListenerExecutorIsolation,
+
ExecutorService executorService)
+ throws Exception {
+ int numMessages = 2;
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ int numPartitions = 50;
+ TopicName nonIsolationTopicName = TopicName.get(topic);
+
admin.topics().createPartitionedTopic(nonIsolationTopicName.toString(),
numPartitions);
+
+ AtomicLong maxConsumeDelay = new AtomicLong(-1);
+ ConsumerBuilder<Long> consumerBuilder =
+ pulsarClient.newConsumer(Schema.INT64)
+ .topic(nonIsolationTopicName.toString())
+ .subscriptionName(subscriptionName)
+ .messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be
null");
+ log.debug("Received message [{}] in the listener",
msg.getValue());
+ c1.acknowledgeAsync(msg);
+ maxConsumeDelay.set(Math.max(maxConsumeDelay.get(),
+ System.currentTimeMillis() -
msg.getValue()));
+ if (consumeSleepTime.toMillis() > 0) {
+
Uninterruptibles.sleepUninterruptibly(consumeSleepTime);
+ }
+ latch.countDown();
+ });
+
+ ExecutorService executor = Executors.newSingleThreadExecutor(
+ new ExecutorProvider.ExtendedThreadFactory(subscriptionName +
"listener-executor-", true));
+ if (enableMessageListenerExecutorIsolation) {
+ consumerBuilder.messageListenerExecutor((message, runnable) ->
executor.execute(runnable));
+ }
+
+ Consumer<Long> consumer = consumerBuilder.subscribe();
+ ProducerBuilder<Long> producerBuilder =
pulsarClient.newProducer(Schema.INT64)
+ .topic(nonIsolationTopicName.toString());
+
+ Producer<Long> producer = producerBuilder.create();
+ List<Future<MessageId>> futures = new ArrayList<>();
+
+ // Asynchronously produce messages
+ for (int i = 0; i < numMessages; i++) {
+ Future<MessageId> future =
producer.sendAsync(System.currentTimeMillis());
+ futures.add(future);
+ }
+
+ log.info("Waiting for async publish to complete");
+ for (Future<MessageId> future : futures) {
+ future.get();
+ }
+
+ CompletableFuture<Long> maxDelayFuture = new CompletableFuture<>();
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, executorService).whenCompleteAsync((v, ex) -> {
+ maxDelayFuture.complete(maxConsumeDelay.get());
+ try {
+ producer.close();
+ consumer.close();
+ executor.shutdownNow();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ return maxDelayFuture;
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index c7919fa473f..1b2e5cc5a5e 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -283,6 +283,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);
+ /**
+ * Set the {@link MessageListenerExecutor} to be used for message
listeners of <b>current consumer</b>.
+ * <i>(default: use executor from PulsarClient,
+ * {@link
org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider})</i>.
+ *
+ * <p>The listener thread pool is exclusively owned by current consumer
+ * that are using a "listener" model to get messages. For a given internal
consumer,
+ * the listener will always be invoked from the same thread, to ensure
ordering.
+ *
+ * <p> The caller need to shut down the thread pool after closing the
consumer to avoid leaks.
+ * @param messageListenerExecutor the executor of the consumer message
listener
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor
messageListenerExecutor);
+
/**
* Sets a {@link CryptoKeyReader}.
*
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
new file mode 100644
index 00000000000..53bb828c05a
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageListenerExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Interface for providing service to execute message listeners.
+ */
+public interface MessageListenerExecutor {
+
+ /**
+ * select a thread by message to execute the runnable!
+ * <p>
+ * Suggestions:
+ * <p>
+ * 1. The message listener task will be submitted to this executor for
execution,
+ * so the implementations of this interface should carefully consider
execution
+ * order if sequential consumption is required.
+ * </p>
+ * <p>
+ * 2. The users should release resources(e.g. threads) of the executor
after closing
+ * the consumer to avoid leaks.
+ * </p>
+ * @param message the message
+ * @param runnable the runnable to execute, that is, the message listener
task
+ */
+ void execute(Message<?> message, Runnable runnable);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 74abb82bfe8..9748a42f0cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -82,6 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
+ protected final MessageListenerExecutor messageListenerExecutor;
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
@@ -139,6 +141,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv,
MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
+ this.messageListenerExecutor = conf.getMessageListenerExecutor() ==
null
+ ? (conf.getSubscriptionType() == SubscriptionType.Key_Shared
+ ? this::executeKeySharedMessageListener
+ : this::executeMessageListener)
+ : conf.getMessageListenerExecutor();
this.externalPinnedExecutor = executorProvider.getExecutor();
this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
@@ -1127,14 +1134,7 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
// internal pinned executor thread while the message
processing happens
final Message<T> finalMsg = msg;
MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.incrementAndGet(this);
- if (SubscriptionType.Key_Shared ==
conf.getSubscriptionType()) {
-
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
- callMessageListener(finalMsg));
- } else {
- getExternalExecutor(msg).execute(() -> {
- callMessageListener(finalMsg);
- });
- }
+ messageListenerExecutor.execute(msg, () ->
callMessageListener(finalMsg));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from
the queue", topic, subscription);
@@ -1147,6 +1147,14 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
});
}
+ private void executeMessageListener(Message<?> message, Runnable runnable)
{
+ getExternalExecutor(message).execute(runnable);
+ }
+
+ private void executeKeySharedMessageListener(Message<?> message, Runnable
runnable) {
+
executorProvider.getExecutor(peekMessageKey(message)).execute(runnable);
+ }
+
protected void callMessageListener(Message<T> msg) {
try {
if (log.isDebugEnabled()) {
@@ -1176,7 +1184,7 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
}
static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
- protected byte[] peekMessageKey(Message<T> msg) {
+ protected byte[] peekMessageKey(Message<?> msg) {
byte[] key = NONE_KEY;
if (msg.hasKey()) {
key = msg.getKeyBytes();
@@ -1243,7 +1251,7 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
- private ExecutorService getExternalExecutor(Message<T> msg) {
+ private ExecutorService getExternalExecutor(Message<?> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ?
((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null &&
receivedConsumer.externalPinnedExecutor != null
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 4d6cf96a010..7197cf6be79 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
@@ -299,6 +300,13 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
return this;
}
+ @Override
+ public ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor
messageListenerExecutor) {
+ checkArgument(messageListenerExecutor != null,
"messageListenerExecutor needs to be not null");
+ conf.setMessageListenerExecutor(messageListenerExecutor);
+ return this;
+ }
+
@Override
public ConsumerBuilder<T> consumerEventListener(@NonNull
ConsumerEventListener consumerEventListener) {
conf.setConsumerEventListener(consumerEventListener);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 18529276c9c..f9ff5913f62 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -90,6 +91,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
+ @JsonIgnore
+ private transient MessageListenerExecutor messageListenerExecutor;
@JsonIgnore
private MessageListener<T> messageListener;