This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch stable in repository https://gitbox.apache.org/repos/asf/pulsar-java-contrib.git
commit 181b75ea4becc00e0208b9b6bff9c0ab8d60ca4b Author: duanlinlin <[email protected]> AuthorDate: Sat Jul 27 12:02:04 2024 +0800 feat[MessageListener]: add CommonMessageListenerExecutor, PartitionOrderMessageListenerExecutor --- .../api/impl/CommonMessageListenerExecutor.java | 33 ++++++++++++++++++++++ .../PartitionOrderMessageListenerExecutor.java | 26 +++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java new file mode 100644 index 0000000..acb5898 --- /dev/null +++ b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java @@ -0,0 +1,33 @@ +package org.apache.pulsar.client.api.impl; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListenerExecutor; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class CommonMessageListenerExecutor implements MessageListenerExecutor { + private final ExecutorService executorService; + + public CommonMessageListenerExecutor(int numThreads, String subscriptionName) { + this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 10000L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { + private final AtomicInteger threadId = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, subscriptionName + "-listener-executor-" + threadId.incrementAndGet()); + } + }); + } + + @Override + public void execute(Message<?> message, Runnable runnable) { + this.executorService.execute(runnable); + } +} diff --git a/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java new file mode 100644 index 0000000..0fb27cf --- /dev/null +++ b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java @@ -0,0 +1,26 @@ +package org.apache.pulsar.client.api.impl; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageListenerExecutor; +import org.apache.pulsar.client.util.ExecutorProvider; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; + +public class PartitionOrderMessageListenerExecutor implements MessageListenerExecutor { + private final ExecutorProvider executorProvider; + + public PartitionOrderMessageListenerExecutor(int numThreads, String subscriptionName) { + this.executorProvider = new ExecutorProvider(numThreads, subscriptionName + "listener-executor-"); + } + + @Override + public void execute(Message<?> message, Runnable runnable) { + // select a thread by partition topic name to execute the runnable! + // that say, the message listener task from the same partition topic + // will be executed by the same thread + ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes()); + // executorService is a SingleThreadExecutor + executorService.execute(runnable); + } +}
