This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 7e3579fca7f5ad582547e883ab6f91d8fd859cf9
Author: changfeng <[email protected]>
AuthorDate: Thu Apr 13 11:43:12 2023 +0800

    new implementation of RocketMQEventSubscriber
---
 adapter/runtimer/pom.xml                           |   8 +-
 .../boot/listener/RocketMQEventSubscriber.java     | 134 +++++----
 .../boot/listener/consumer/ClientConfig.java       | 113 ++++++++
 .../boot/listener/consumer/ConsumeRequest.java     |  34 +++
 .../listener/consumer/ExponentialRetryPolicy.java  |  69 +++++
 .../boot/listener/consumer/LitePullConsumer.java   |  29 ++
 .../listener/consumer/LitePullConsumerImpl.java    | 299 +++++++++++++++++++++
 .../boot/listener/consumer/LocalMessageCache.java  | 157 +++++++++++
 .../boot/listener/consumer/RetryPolicy.java        |   9 +
 9 files changed, 797 insertions(+), 55 deletions(-)

diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 1e7c7ed..dd8a653 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -40,7 +40,13 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
-            <version>4.9.2</version>
+            <version>5.1.0</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-acl</artifactId>
+            <version>5.1.0</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 1e504c1..10d25ac 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -18,9 +18,10 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
 import com.alibaba.fastjson.JSON;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.gson.Gson;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
@@ -30,23 +31,34 @@ import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.ClientConfig;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumer;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumerImpl;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.support.PropertiesLoaderUtils;
 
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -56,16 +68,17 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RocketMQEventSubscriber.class);
 
-    private DefaultLitePullConsumer pullConsumer;
+    private LitePullConsumer pullConsumer;
 
     private final TargetRunnerConfigObserver runnerConfigObserver;
 
     private Integer pullTimeOut;
-
-    private String namesrvAddr;
-
     private Integer pullBatchSize;
 
+    private ClientConfig clientConfig;
+    private SessionCredentials sessionCredentials;
+    private String socksProxy;
+
     private static final String SEMICOLON = ";";
 
     private static final String SYS_DEFAULT_GROUP = 
"event-bridge-default-group";
@@ -81,7 +94,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
     @Override
     public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum 
refreshTypeEnum) {
         if(Objects.isNull(pullConsumer)){
-            pullConsumer = initDefaultMQPullConsumer();
+            pullConsumer = initLitePullConsumer();
             return;
         }
         Set<String> currentTopics = 
parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig));
@@ -102,7 +115,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     @Override
     public List<ConnectRecord> pull() {
-        List<MessageExt> messages = pullConsumer.poll(pullTimeOut);
+        List<MessageExt> messages = pullConsumer.poll(pullBatchSize, 
Duration.ofSeconds(pullTimeOut));
         if (CollectionUtils.isEmpty(messages)) {
             logger.info("consumer poll message empty , consumer - {}", 
JSON.toJSONString(pullConsumer));
             return null;
@@ -156,10 +169,43 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      */
     private void initMqProperties() {
         try {
+            ClientConfig clientConfig = new ClientConfig();
             Properties properties = 
PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
-            namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
+            String namesrvAddr = 
properties.getProperty("rocketmq.namesrvAddr");
+            String consumerGroup = 
properties.getProperty("rocketmq.consumerGroup");
             pullTimeOut = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
             pullBatchSize = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
+            String accessChannel = 
properties.getProperty("rocketmq.accessChannel");
+            String namespace = properties.getProperty("rocketmq.namespace");
+            String accessKey = 
properties.getProperty("rocketmq.consumer.accessKey");
+            String secretKey = 
properties.getProperty("rocketmq.consumer.secretKey");
+            String socks5UserName = 
properties.getProperty("rocketmq.consumer.socks5UserName");
+            String socks5Password = 
properties.getProperty("rocketmq.consumer.socks5Password");
+            String socks5Endpoint = 
properties.getProperty("rocketmq.consumer.socks5Endpoint");
+
+            clientConfig.setNameSrvAddr(namesrvAddr);
+            clientConfig.setConsumerGroup(StringUtils.isBlank(consumerGroup) ?
+                    createGroupName(SYS_DEFAULT_GROUP) : consumerGroup);
+            
clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ?
+                    AccessChannel.CLOUD : AccessChannel.LOCAL);
+            clientConfig.setNamespace(namespace);
+            this.clientConfig = clientConfig;
+
+            if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
+                this.sessionCredentials = new SessionCredentials(accessKey, 
secretKey);
+            }
+
+            if (StringUtils.isNotBlank(socks5UserName) && 
StringUtils.isNotBlank(socks5Password)
+                    && StringUtils.isNotBlank(socks5Endpoint)) {
+                SocksProxyConfig proxyConfig = new SocksProxyConfig();
+                proxyConfig.setUsername(socks5UserName);
+                proxyConfig.setPassword(socks5Password);
+                proxyConfig.setAddr(socks5Endpoint);
+                Map<String, SocksProxyConfig> proxyConfigMap = 
Maps.newHashMap();
+                proxyConfigMap.put("0.0.0.0/0", proxyConfig);
+                this.socksProxy = new Gson().toJson(proxyConfigMap);
+            }
+
         }catch (Exception exception){
             logger.error("init rocket mq property exception, stack trace-", 
exception);
         }
@@ -169,67 +215,51 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      * init rocket mq pull consumer
      */
     private void initPullConsumer() {
-        pullConsumer = initDefaultMQPullConsumer();
+        pullConsumer = initLitePullConsumer();
     }
 
     /**
      * first init default rocketmq pull consumer
      * @return
      */
-    public DefaultLitePullConsumer initDefaultMQPullConsumer () {
+    public LitePullConsumer initLitePullConsumer() {
+        if (pullConsumer != null) {
+            try {
+                pullConsumer.shutdown();
+            } catch (Exception e) {
+                logger.error("Consumer shutdown failed.", e);
+            }
+        }
+
         Set<TargetRunnerConfig> targetRunnerConfigs = 
runnerConfigObserver.getTargetRunnerConfig();
         Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs);
-        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
-        consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP));
-        consumer.setNamesrvAddr(namesrvAddr);
-        consumer.setPullBatchSize(pullBatchSize);
+
+        RPCHook rpcHook = this.sessionCredentials != null ? new 
AclClientRPCHook(this.sessionCredentials) : null;
+        this.pullConsumer = new LitePullConsumerImpl(this.clientConfig, 
rpcHook);
+        if (StringUtils.isNotBlank(this.socksProxy)) {
+            pullConsumer.setSockProxyJson(this.socksProxy);
+        }
         try {
             for(String topic : topics){
-                consumer.subscribe(topic, "*");
+                pullConsumer.attachTopic(topic, "*");
             }
-            consumer.start();
+            pullConsumer.startup();
         } catch (Exception exception) {
             logger.error("init default pull consumer exception, topic -" + 
topics.toString() + "-stackTrace-", exception);
             throw new EventBridgeException(" init rocketmq consumer failed");
         }
-        return consumer;
+        return pullConsumer;
     }
 
     private String createGroupName(String prefix) {
         StringBuilder sb = new StringBuilder();
         sb.append(prefix).append("-");
-        sb.append(RemotingUtil.getLocalAddress()).append("-");
+        sb.append(NetworkUtil.getLocalAddress()).append("-");
         sb.append(UtilAll.getPid()).append("-");
         sb.append(System.nanoTime());
         return sb.toString().replace(".", "-");
     }
 
-    private String createInstance(String servers) {
-        String[] serversArray = servers.split(";");
-        List<String> serversList = new ArrayList<String>();
-        for (String server : serversArray) {
-            if (!serversList.contains(server)) {
-                serversList.add(server);
-            }
-        }
-        Collections.sort(serversList);
-        return String.valueOf(serversList.toString().hashCode());
-    }
-
-    /**
-     * parse msg queue by queue json
-     *
-     * @param messageQueueStr
-     * @return
-     */
-    private MessageQueue parseMessageQueueList(String messageQueueStr) {
-        List<String> messageQueueStrList = 
Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
-        if (CollectionUtils.isEmpty(messageQueueStrList) || 
messageQueueStrList.size() != 3) {
-            return null;
-        }
-        return new MessageQueue(messageQueueStrList.get(0), 
messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
-    }
-
     /**
      * MessageExt convert to connect record
      * @param messageExt
@@ -281,11 +311,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      * @param topic
      */
     private void subscribe(String topic) {
-        try {
-            pullConsumer.subscribe(topic, "*");
-        } catch (MQClientException exception) {
-            logger.error("rocketmq event subscribe new topic failed, stack 
trace - ", exception);
-        }
+        pullConsumer.subscribe(topic);
     }
 
     /**
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
new file mode 100644
index 0000000..255fdfd
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java
@@ -0,0 +1,113 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import org.apache.rocketmq.client.AccessChannel;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:08 上午
+ */
+public class ClientConfig {
+    private int rmqPullMessageCacheCapacity = 1000;
+    private int rmqPullMessageBatchNums = 20;
+    private ConsumeFromWhere consumeFromWhere = 
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+    private long consumeTimestamp = System.currentTimeMillis();
+    private String nameSrvAddr;
+    private String namespace;
+    private String consumerGroup;
+    private int pullInterval = 0;
+    // All the offsets will be committed in the commit thread if enable this 
flag.
+    // To avoid too many rpc calls, disable it and rely on the inner offset 
automatic commit mechanism
+    private boolean commitSync = false;
+    private String routeHookEndpoint;
+    private AccessChannel accessChannel;
+
+    public int getRmqPullMessageCacheCapacity() {
+        return rmqPullMessageCacheCapacity;
+    }
+
+    public void setRmqPullMessageCacheCapacity(final int capacity) {
+        this.rmqPullMessageCacheCapacity = capacity;
+    }
+
+    public int getRmqPullMessageBatchNums() {
+        return rmqPullMessageBatchNums;
+    }
+
+    public void setRmqPullMessageBatchNums(final int nums) {
+        this.rmqPullMessageBatchNums = nums;
+    }
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+    public void setConsumeFromWhere(
+            final ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+    public long getConsumeTimestamp() {
+        return consumeTimestamp;
+    }
+
+    public void setConsumeTimestamp(final long consumeTimestamp) {
+        this.consumeTimestamp = consumeTimestamp;
+    }
+
+    public String getNameSrvAddr() {
+        return nameSrvAddr;
+    }
+
+    public void setNameSrvAddr(final String nameSrvAddr) {
+        this.nameSrvAddr = nameSrvAddr;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(final String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public int getPullInterval() {
+        return pullInterval;
+    }
+
+    public void setPullInterval(final int pullInterval) {
+        this.pullInterval = pullInterval;
+    }
+
+    public boolean isCommitSync() {
+        return commitSync;
+    }
+
+    public void setCommitSync(final boolean commitSync) {
+        this.commitSync = commitSync;
+    }
+
+    public String getRouteHookEndpoint() {
+        return routeHookEndpoint;
+    }
+
+    public void setRouteHookEndpoint(final String routeHookEndpoint) {
+        this.routeHookEndpoint = routeHookEndpoint;
+    }
+
+    public AccessChannel getAccessChannel() {
+        return accessChannel;
+    }
+
+    public void setAccessChannel(AccessChannel accessChannel) {
+        this.accessChannel = accessChannel;
+    }
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
new file mode 100644
index 0000000..ac949ea
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java
@@ -0,0 +1,34 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:07 上午
+ */
+public class ConsumeRequest {
+    private final MessageExt messageExt;
+    private final MessageQueue messageQueue;
+    private final ProcessQueue processQueue;
+
+    public ConsumeRequest(final MessageExt messageExt, final MessageQueue 
messageQueue,
+                          final ProcessQueue processQueue) {
+        this.messageExt = messageExt;
+        this.messageQueue = messageQueue;
+        this.processQueue = processQueue;
+    }
+
+    public MessageExt getMessageExt() {
+        return messageExt;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public ProcessQueue getProcessQueue() {
+        return processQueue;
+    }
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
new file mode 100644
index 0000000..9ba9316
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java
@@ -0,0 +1,69 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:11 上午
+ */
+public class ExponentialRetryPolicy implements RetryPolicy {
+    private long initial = TimeUnit.SECONDS.toMillis(5);
+    private long max = TimeUnit.HOURS.toMillis(2);
+    private long multiplier = 2;
+    private int retryCount = 0;
+
+    public ExponentialRetryPolicy() {
+    }
+
+    public ExponentialRetryPolicy(long initial, long max, long multiplier) {
+        this.initial = initial;
+        this.max = max;
+        this.multiplier = multiplier;
+    }
+
+    public long getInitial() {
+        return initial;
+    }
+
+    public void setInitial(long initial) {
+        this.initial = initial;
+    }
+
+    public long getMax() {
+        return max;
+    }
+
+    public void setMax(long max) {
+        this.max = max;
+    }
+
+    public long getMultiplier() {
+        return multiplier;
+    }
+
+    public void setMultiplier(long multiplier) {
+        this.multiplier = multiplier;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("initial", initial)
+                .add("max", max)
+                .add("multiplier", multiplier)
+                .toString();
+    }
+
+    @Override
+    public long nextDelayDuration() {
+        if (retryCount < 0) {
+            retryCount = 0;
+        }
+        if (retryCount > 32) {
+            retryCount = 32;
+        }
+        return Math.min(max, initial * (long) Math.pow(multiplier, 
retryCount++));
+    }
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
new file mode 100644
index 0000000..d2e2671
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java
@@ -0,0 +1,29 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:09 上午
+ */
+public interface LitePullConsumer {
+    void startup() throws MQClientException;
+
+    void shutdown();
+
+    void attachTopic(String topic, String tag);
+
+    List<MessageExt> poll(int pullBatchSize, Duration timeout);
+
+    void commit(List<String> messageIdList);
+
+    void setSockProxyJson(String proxyJson);
+
+    void subscribe(String topic);
+
+    void unsubscribe(String topic);
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
new file mode 100644
index 0000000..0f887cc
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java
@@ -0,0 +1,299 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:10 上午
+ */
+public class LitePullConsumerImpl implements LitePullConsumer {
+    private static final Logger log = 
LoggerFactory.getLogger(LitePullConsumerImpl.class);
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private final LocalMessageCache localMessageCache;
+    private final ConcurrentHashMap<MessageQueue, ExponentialRetryPolicy> 
retryPolicies;
+    private final ClientConfig clientConfig;
+    private final Map<MessageQueue, ProcessQueue> runningQueueMap = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService scheduleService = new 
ScheduledThreadPoolExecutor(1,
+            ThreadUtils.newThreadFactory("PullConsumerScheduleService", 
false));
+    private final ExecutorService executorService = 
Executors.newSingleThreadExecutor(
+            ThreadUtils.newThreadFactory("PullConsumerExecutorService", 
false));
+
+    public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook 
rpcHook) {
+        this.clientConfig = clientConfig;
+        rocketmqPullConsumer = new 
DefaultMQPullConsumer(clientConfig.getConsumerGroup(), rpcHook);
+        rocketmqPullConsumer.setNamesrvAddr(clientConfig.getNameSrvAddr());
+        rocketmqPullConsumer.setAllocateMessageQueueStrategy(new 
AllocateMessageQueueAveragelyByCircle());
+        
rocketmqPullConsumer.setInstanceName(buildInstanceName(clientConfig.getNameSrvAddr(),
 clientConfig.getConsumerGroup()));
+        if (clientConfig.getAccessChannel() != null) {
+            
rocketmqPullConsumer.setAccessChannel(clientConfig.getAccessChannel());
+        }
+        if (StringUtils.isNotBlank(clientConfig.getNamespace())) {
+            rocketmqPullConsumer.setNamespace(clientConfig.getNamespace());
+        }
+        localMessageCache = new LocalMessageCache(rocketmqPullConsumer, 
clientConfig);
+        retryPolicies = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void startup() throws MQClientException {
+        rocketmqPullConsumer.start();
+        log.info("RocketmqPullConsumer start.");
+    }
+
+    @Override
+    public void shutdown() {
+        rocketmqPullConsumer.shutdown();
+        shutdownThreadPool(executorService);
+        shutdownThreadPool(scheduleService);
+    }
+
+    private void shutdownThreadPool(ExecutorService executor) {
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                executor.awaitTermination(60, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Shutdown threadPool failed", e);
+            }
+            if (!executor.isTerminated()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    @Override
+    public void attachTopic(final String topic, final String tag) {
+        rocketmqPullConsumer.setRegisterTopics(new 
HashSet<>(Collections.singletonList(topic)));
+        rocketmqPullConsumer.registerMessageQueueListener(topic, new 
MessageQueueListener() {
+            @Override
+            public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
+                submitPullTask(topic, tag, mqDivided);
+                localMessageCache.shrinkPullOffsetTable(mqDivided);
+                retryPolicies.entrySet().removeIf(next -> 
!mqDivided.contains(next.getKey()));
+                log.info("Load balance result of topic {} changed, mqAll {}, 
mqDivided {}.", topic, mqAll, mqDivided);
+            }
+        });
+    }
+
+    @Override
+    public void subscribe(final String topic) {
+        
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().subscriptionAutomatically(topic);
+
+    }
+
+    @Override
+    public void unsubscribe(final String topic) {
+        rocketmqPullConsumer.getDefaultMQPullConsumerImpl().unsubscribe(topic);
+    }
+
+    @Override
+    public List<MessageExt> poll(final int pullBatchSize, final Duration 
timeout) {
+        return localMessageCache.poll(pullBatchSize, timeout);
+    }
+
+    @Override
+    public void commit(final List<String> messageIdList) {
+        localMessageCache.commit(messageIdList);
+    }
+
+    @Override
+    public void setSockProxyJson(final String proxyJson) {
+        rocketmqPullConsumer.setSocksProxyConfig(proxyJson);
+    }
+
+    private String buildInstanceName(String nameSrvAddr, String consumerGroup) 
{
+        return UtilAll.getPid()
+                + "#" + nameSrvAddr.hashCode()
+                + "#" + consumerGroup
+                + "#" + System.nanoTime();
+    }
+
+    private RetryPolicy getRetryPolicy(MessageQueue messageQueue) {
+        return retryPolicies.computeIfAbsent(messageQueue, queue ->
+                new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(), 
Duration.ofMinutes(10).toMillis(), 2));
+    }
+
+    private void removeRetryPolicy(MessageQueue messageQueue) {
+        retryPolicies.remove(messageQueue);
+    }
+
+    private void submitPullTask(String topic, String tag, Set<MessageQueue> 
assignedQueues) {
+        Set<MessageQueue> runningQueues = runningQueueMap.keySet();
+        for (MessageQueue runningQueue : runningQueues) {
+            if (runningQueue == null || 
!assignedQueues.contains(runningQueue)) {
+                ProcessQueue processQueue = 
runningQueueMap.remove(runningQueue);
+                if (processQueue != null) {
+                    processQueue.setDropped(true);
+                }
+            }
+        }
+        if (CollectionUtils.isEmpty(assignedQueues)) {
+            log.warn("Not found any messageQueue, topic:{}", topic);
+            return;
+        }
+
+        for (MessageQueue messageQueue : assignedQueues) {
+            ProcessQueue processQueue = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                    .getProcessQueueTable().get(messageQueue);
+            if (runningQueueMap.putIfAbsent(messageQueue, processQueue) == 
null) {
+                try {
+                    PullTask pullTask = new PullTask(messageQueue, tag);
+                    executorService.submit(pullTask);
+                    log.info("Submit pullTask:{}", messageQueue);
+                } catch (Exception e) {
+                    log.error("Failed submit pullTask:{}, {}, wait next 
balancing", topic, messageQueue, e);
+                    // 添加pull失败,等待下次 rebalance
+                    processQueue = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                            .getProcessQueueTable().remove(messageQueue);
+                    if (processQueue != null) {
+                        processQueue.setDropped(true);
+                    }
+                    runningQueueMap.remove(messageQueue);
+                }
+            }
+        }
+
+    }
+
+    void pullLater(PullTask pullTask, long delay, TimeUnit unit) {
+        scheduleService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                pullTask.run();
+            }
+        }, delay, unit);
+    }
+
+    class PullTask implements Runnable {
+        private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+
+        private final String tag;
+        private final MessageQueue messageQueue;
+
+        public PullTask(MessageQueue messageQueue, String tag) {
+            this.messageQueue = messageQueue;
+            this.tag = tag;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if 
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
 {
+                    log.warn("RocketmqPullConsumer not running, pullTask 
exit.");
+                    return;
+                }
+                ProcessQueue processQueue = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                        .getProcessQueueTable().get(messageQueue);
+                if (processQueue == null || processQueue.isDropped()) {
+                    log.info("ProcessQueue {} dropped, pullTask exit", 
messageQueue);
+                    return;
+                }
+                long offset = localMessageCache.nextPullOffset(messageQueue);
+                int batchNums = localMessageCache.nextPullBatchNums();
+                // If batchNums is zero, an exception will be thrown and then 
trigger a delay
+                if (batchNums <= 0) {
+                    final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
+                    log.warn("Local cache is full, delay the pull task {} ms 
for message queue {}",
+                            delayTimeMillis, messageQueue);
+                    pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                }
+
+                rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue, 
this.tag, offset, batchNums, new PullCallback() {
+                    @Override
+                    public void onSuccess(PullResult pullResult) {
+                        try {
+                            if 
(!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState()))
 {
+                                log.warn("rocketmqPullConsumer not running, 
pullTask exit.");
+                                return;
+                            }
+
+                            ProcessQueue pq = 
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                                    .getProcessQueueTable().get(messageQueue);
+                            switch (pullResult.getPullStatus()) {
+                                case FOUND:
+                                    if (pq != null && !pq.isDropped()) {
+                                        
pq.putMessage(pullResult.getMsgFoundList());
+                                        for (final MessageExt messageExt : 
pullResult.getMsgFoundList()) {
+                                            
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, 
messageQueue, pq));
+                                        }
+                                        
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
+                                        removeRetryPolicy(messageQueue);
+                                        executorService.submit(PullTask.this);
+                                    } else {
+                                        
localMessageCache.removePullOffset(messageQueue);
+                                        log.info("ProcessQueue {} dropped, 
discard the pulled message.", messageQueue);
+                                        removeRetryPolicy(messageQueue);
+                                    }
+                                    break;
+                                case OFFSET_ILLEGAL:
+                                    final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
+                                    log.warn("The pull request offset is 
illegal, offset is {}, message queue is {}, " +
+                                                    "pull result is {}, delay 
{} ms for next pull",
+                                            offset, messageQueue, pullResult, 
delayTimeMillis);
+                                    
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
+                                    pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                                    break;
+                                case NO_NEW_MSG:
+                                case NO_MATCHED_MSG:
+                                    log.info("No NEW_MSG or MATCHED_MSG for 
mq:{}, pull again.", messageQueue);
+                                    
localMessageCache.updatePullOffset(messageQueue, 
pullResult.getNextBeginOffset());
+                                    removeRetryPolicy(messageQueue);
+                                    executorService.submit(PullTask.this);
+                                    break;
+                                default:
+                                    log.warn("Failed to process pullResult, 
mq:{} {}", messageQueue, pullResult);
+                                    break;
+                            }
+                        } catch (Throwable t) {
+                            log.error("Exception occurs when process 
pullResult", t);
+                            pullLater(PullTask.this, 
PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS);
+                        }
+                    }
+
+                    @Override
+                    public void onException(Throwable e) {
+                        final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
+                        log.error("Exception happens when pull message 
process, delay {} ms for message queue {}",
+                                delayTimeMillis, messageQueue, e);
+                        pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+                    }
+                });
+            } catch (Throwable t) {
+                final int delayTimeMillis = (int) 
getRetryPolicy(messageQueue).nextDelayDuration();
+                log.error("Error occurs when pull message process, delay {} ms 
for message queue {}",
+                        delayTimeMillis, messageQueue, t);
+                pullLater(PullTask.this, delayTimeMillis, 
TimeUnit.MILLISECONDS);
+            }
+        }
+
+    }
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
new file mode 100644
index 0000000..4068fa1
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java
@@ -0,0 +1,157 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:06 上午
+ */
+public class LocalMessageCache {
+    private static final Logger log = 
LoggerFactory.getLogger(LocalMessageCache.class);
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache;
+    private final Map<String, ConsumeRequest> consumedRequest;
+    private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private final ClientConfig clientConfig;
+
+    LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final 
ClientConfig clientConfig) {
+        consumeRequestCache = new 
LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
+        this.consumedRequest = new ConcurrentHashMap<>();
+        this.pullOffsetTable = new ConcurrentHashMap<>();
+        this.rocketmqPullConsumer = rocketmqPullConsumer;
+        this.clientConfig = clientConfig;
+    }
+
+    int nextPullBatchNums() {
+        return Math.min(clientConfig.getRmqPullMessageBatchNums(), 
consumeRequestCache.remainingCapacity());
+    }
+
+    long nextPullOffset(MessageQueue remoteQueue) {
+        final AtomicReference<RuntimeException> outerException = new 
AtomicReference<>();
+        final Long existsOffset = pullOffsetTable.computeIfAbsent(remoteQueue, 
messageQueue -> {
+            try {
+                // Got -1 when MQBrokerException occurred, aka broker returns 
non-0 code, e.g. ResponseCode.QUERY_NOT_FOUND
+                // or any runtime exception thrown from rpc hook.
+                // Got -2 when other exception occurred, include broker not 
exists, network or client exception
+                long offset = 
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false);
+                if (offset == -2) {
+                    outerException.set(new 
RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq 
client for more details"));
+                    return null;
+                }
+                if (offset == -1) {
+                    // Follow the CONSUME_FROM_WHERE to compute next pull 
offset
+                    // But note that if broker thrown any unexpected runtime 
exception may cause offset rollback.
+                    // We don't handle this risk because of MetaQ doesn't have 
any rpc hook
+                    final ConsumeFromWhere fromWhere = 
clientConfig.getConsumeFromWhere();
+                    switch (fromWhere) {
+                        case CONSUME_FROM_LAST_OFFSET:
+                            offset = 
this.rocketmqPullConsumer.maxOffset(remoteQueue);
+                            break;
+                        case CONSUME_FROM_FIRST_OFFSET:
+                            offset = 
this.rocketmqPullConsumer.minOffset(remoteQueue);
+                            break;
+                        case CONSUME_FROM_TIMESTAMP:
+                            offset = 
this.rocketmqPullConsumer.searchOffset(remoteQueue, 
clientConfig.getConsumeTimestamp());
+                            break;
+                    }
+                }
+                if (offset >= 0) {
+                    // Got an offset from offset store
+                    return offset;
+                }
+                // Maybe wrong ConsumeFromWhere configured, so couldn't find 
any offset
+                return null;
+            } catch (Exception e) {
+                outerException.set(new RuntimeException(e));
+                return null;
+            }
+        });
+        if (outerException.get() != null) {
+            throw outerException.get();
+        }
+        if (existsOffset == null) {
+            final String errorMsg = "[BUG] No offset found in offset store or 
pullOffsetTable without any exception";
+            log.warn(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+        return existsOffset;
+    }
+
+    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        pullOffsetTable.put(remoteQueue, nextPullOffset);
+    }
+
+    void removePullOffset(MessageQueue remoteQueue) {
+        pullOffsetTable.remove(remoteQueue);
+    }
+
+    void shrinkPullOffsetTable(Set<MessageQueue> mqDivided) {
+        pullOffsetTable.entrySet().removeIf(next -> 
!mqDivided.contains(next.getKey()));
+    }
+
+    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    public List<MessageExt> poll(final int pullBatchSize, final Duration 
timeout) {
+        List<MessageExt> messageList = new ArrayList<>();
+        try {
+            List<ConsumeRequest> consumeRequestList = new ArrayList<>();
+            consumeRequestCache.drainTo(consumeRequestList, pullBatchSize);
+            if (consumeRequestList.size() == 0) {
+                final ConsumeRequest consumeRequest = 
consumeRequestCache.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+                if (consumeRequest != null) {
+                    consumeRequestList.add(consumeRequest);
+                    // drainTo again
+                    consumeRequestCache.drainTo(consumeRequestList, 
pullBatchSize - 1);
+                }
+            }
+            for (final ConsumeRequest consumeRequest : consumeRequestList) {
+                MessageExt messageExt = consumeRequest.getMessageExt();
+                consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+                messageList.add(messageExt);
+            }
+        } catch (InterruptedException e) {
+            log.warn("Poll from local cache interrupted.", e);
+        }
+        return messageList;
+    }
+
+    public void commit(final List<String> messageList) {
+        for (final String msgId : messageList) {
+            ConsumeRequest consumeRequest = consumedRequest.remove(msgId);
+            if (consumeRequest != null) {
+                long offset = 
consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
+                try {
+                    
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), 
offset);
+                } catch (MQClientException e) {
+                    log.error("A error occurred in update consume offset 
process.", e);
+                }
+            }
+        }
+        if (clientConfig.isCommitSync()) {
+            
rocketmqPullConsumer.getDefaultMQPullConsumerImpl().persistConsumerOffset();
+        }
+    }
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
new file mode 100644
index 0000000..bd31d9c
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java
@@ -0,0 +1,9 @@
+package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer;
+
+/**
+ * @Author changfeng
+ * @Date 2023/4/9 10:10 上午
+ */
+public interface RetryPolicy {
+    long nextDelayDuration();
+}


Reply via email to