This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch stable
in repository https://gitbox.apache.org/repos/asf/pulsar-java-contrib.git

commit 181b75ea4becc00e0208b9b6bff9c0ab8d60ca4b
Author: duanlinlin <[email protected]>
AuthorDate: Sat Jul 27 12:02:04 2024 +0800

    feat[MessageListener]: add CommonMessageListenerExecutor, 
PartitionOrderMessageListenerExecutor
---
 .../api/impl/CommonMessageListenerExecutor.java    | 33 ++++++++++++++++++++++
 .../PartitionOrderMessageListenerExecutor.java     | 26 +++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git 
a/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java
 
b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java
new file mode 100644
index 0000000..acb5898
--- /dev/null
+++ 
b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/CommonMessageListenerExecutor.java
@@ -0,0 +1,33 @@
+package org.apache.pulsar.client.api.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CommonMessageListenerExecutor implements MessageListenerExecutor {
+    private final ExecutorService executorService;
+
+    public CommonMessageListenerExecutor(int numThreads, String 
subscriptionName) {
+        this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 
10000L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                    private final AtomicInteger threadId = new 
AtomicInteger(0);
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, subscriptionName + 
"-listener-executor-" + threadId.incrementAndGet());
+                    }
+                });
+    }
+
+    @Override
+    public void execute(Message<?> message, Runnable runnable) {
+        this.executorService.execute(runnable);
+    }
+}
diff --git 
a/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java
 
b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java
new file mode 100644
index 0000000..0fb27cf
--- /dev/null
+++ 
b/pulsar-client-contrib/src/main/java/org/apache/pulsar/client/api/impl/PartitionOrderMessageListenerExecutor.java
@@ -0,0 +1,26 @@
+package org.apache.pulsar.client.api.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListenerExecutor;
+import org.apache.pulsar.client.util.ExecutorProvider;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+
+public class PartitionOrderMessageListenerExecutor implements 
MessageListenerExecutor {
+    private final ExecutorProvider executorProvider;
+
+    public PartitionOrderMessageListenerExecutor(int numThreads, String 
subscriptionName) {
+        this.executorProvider = new ExecutorProvider(numThreads, 
subscriptionName + "listener-executor-");
+    }
+
+    @Override
+    public void execute(Message<?> message, Runnable runnable) {
+        // select a thread by partition topic name to execute the runnable!
+        // that say, the message listener task from the same partition topic
+        // will be executed by the same thread
+        ExecutorService executorService = 
executorProvider.getExecutor(message.getTopicName().getBytes());
+        // executorService is a SingleThreadExecutor
+        executorService.execute(runnable);
+    }
+}

Reply via email to