This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 7e3579fca7f5ad582547e883ab6f91d8fd859cf9 Author: changfeng <[email protected]> AuthorDate: Thu Apr 13 11:43:12 2023 +0800 new implementation of RocketMQEventSubscriber --- adapter/runtimer/pom.xml | 8 +- .../boot/listener/RocketMQEventSubscriber.java | 134 +++++---- .../boot/listener/consumer/ClientConfig.java | 113 ++++++++ .../boot/listener/consumer/ConsumeRequest.java | 34 +++ .../listener/consumer/ExponentialRetryPolicy.java | 69 +++++ .../boot/listener/consumer/LitePullConsumer.java | 29 ++ .../listener/consumer/LitePullConsumerImpl.java | 299 +++++++++++++++++++++ .../boot/listener/consumer/LocalMessageCache.java | 157 +++++++++++ .../boot/listener/consumer/RetryPolicy.java | 9 + 9 files changed, 797 insertions(+), 55 deletions(-) diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml index 1e7c7ed..dd8a653 100644 --- a/adapter/runtimer/pom.xml +++ b/adapter/runtimer/pom.xml @@ -40,7 +40,13 @@ <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> - <version>4.9.2</version> + <version>5.1.0</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-acl</artifactId> + <version>5.1.0</version> <scope>compile</scope> </dependency> <dependency> diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java index 1e504c1..10d25ac 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java @@ -18,9 +18,10 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener; import com.alibaba.fastjson.JSON; -import com.google.common.base.Splitter; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.Gson; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.RecordOffset; @@ -30,23 +31,34 @@ import io.openmessaging.internal.DefaultKeyValue; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.ClientConfig; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumer; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer.LitePullConsumerImpl; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.RefreshTypeEnum; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; -import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.support.PropertiesLoaderUtils; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; import java.util.concurrent.CompletableFuture; /** @@ -56,16 +68,17 @@ public class RocketMQEventSubscriber extends EventSubscriber { private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class); - private DefaultLitePullConsumer pullConsumer; + private LitePullConsumer pullConsumer; private final TargetRunnerConfigObserver runnerConfigObserver; private Integer pullTimeOut; - - private String namesrvAddr; - private Integer pullBatchSize; + private ClientConfig clientConfig; + private SessionCredentials sessionCredentials; + private String socksProxy; + private static final String SEMICOLON = ";"; private static final String SYS_DEFAULT_GROUP = "event-bridge-default-group"; @@ -81,7 +94,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { @Override public void refresh(TargetRunnerConfig targetRunnerConfig, RefreshTypeEnum refreshTypeEnum) { if(Objects.isNull(pullConsumer)){ - pullConsumer = initDefaultMQPullConsumer(); + pullConsumer = initLitePullConsumer(); return; } Set<String> currentTopics = parseTopicsByRunnerConfigs(Sets.newHashSet(targetRunnerConfig)); @@ -102,7 +115,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { @Override public List<ConnectRecord> pull() { - List<MessageExt> messages = pullConsumer.poll(pullTimeOut); + List<MessageExt> messages = pullConsumer.poll(pullBatchSize, Duration.ofSeconds(pullTimeOut)); if (CollectionUtils.isEmpty(messages)) { logger.info("consumer poll message empty , consumer - {}", JSON.toJSONString(pullConsumer)); return null; @@ -156,10 +169,43 @@ public class RocketMQEventSubscriber extends EventSubscriber { */ private void initMqProperties() { try { + ClientConfig clientConfig = new ClientConfig(); Properties properties = PropertiesLoaderUtils.loadAllProperties("runtimer.properties"); - namesrvAddr = properties.getProperty("rocketmq.namesrvAddr"); + String namesrvAddr = properties.getProperty("rocketmq.namesrvAddr"); + String consumerGroup = properties.getProperty("rocketmq.consumerGroup"); pullTimeOut = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut")); pullBatchSize = Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize")); + String accessChannel = properties.getProperty("rocketmq.accessChannel"); + String namespace = properties.getProperty("rocketmq.namespace"); + String accessKey = properties.getProperty("rocketmq.consumer.accessKey"); + String secretKey = properties.getProperty("rocketmq.consumer.secretKey"); + String socks5UserName = properties.getProperty("rocketmq.consumer.socks5UserName"); + String socks5Password = properties.getProperty("rocketmq.consumer.socks5Password"); + String socks5Endpoint = properties.getProperty("rocketmq.consumer.socks5Endpoint"); + + clientConfig.setNameSrvAddr(namesrvAddr); + clientConfig.setConsumerGroup(StringUtils.isBlank(consumerGroup) ? + createGroupName(SYS_DEFAULT_GROUP) : consumerGroup); + clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ? + AccessChannel.CLOUD : AccessChannel.LOCAL); + clientConfig.setNamespace(namespace); + this.clientConfig = clientConfig; + + if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { + this.sessionCredentials = new SessionCredentials(accessKey, secretKey); + } + + if (StringUtils.isNotBlank(socks5UserName) && StringUtils.isNotBlank(socks5Password) + && StringUtils.isNotBlank(socks5Endpoint)) { + SocksProxyConfig proxyConfig = new SocksProxyConfig(); + proxyConfig.setUsername(socks5UserName); + proxyConfig.setPassword(socks5Password); + proxyConfig.setAddr(socks5Endpoint); + Map<String, SocksProxyConfig> proxyConfigMap = Maps.newHashMap(); + proxyConfigMap.put("0.0.0.0/0", proxyConfig); + this.socksProxy = new Gson().toJson(proxyConfigMap); + } + }catch (Exception exception){ logger.error("init rocket mq property exception, stack trace-", exception); } @@ -169,67 +215,51 @@ public class RocketMQEventSubscriber extends EventSubscriber { * init rocket mq pull consumer */ private void initPullConsumer() { - pullConsumer = initDefaultMQPullConsumer(); + pullConsumer = initLitePullConsumer(); } /** * first init default rocketmq pull consumer * @return */ - public DefaultLitePullConsumer initDefaultMQPullConsumer () { + public LitePullConsumer initLitePullConsumer() { + if (pullConsumer != null) { + try { + pullConsumer.shutdown(); + } catch (Exception e) { + logger.error("Consumer shutdown failed.", e); + } + } + Set<TargetRunnerConfig> targetRunnerConfigs = runnerConfigObserver.getTargetRunnerConfig(); Set<String> topics = parseTopicsByRunnerConfigs(targetRunnerConfigs); - DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(); - consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP)); - consumer.setNamesrvAddr(namesrvAddr); - consumer.setPullBatchSize(pullBatchSize); + + RPCHook rpcHook = this.sessionCredentials != null ? new AclClientRPCHook(this.sessionCredentials) : null; + this.pullConsumer = new LitePullConsumerImpl(this.clientConfig, rpcHook); + if (StringUtils.isNotBlank(this.socksProxy)) { + pullConsumer.setSockProxyJson(this.socksProxy); + } try { for(String topic : topics){ - consumer.subscribe(topic, "*"); + pullConsumer.attachTopic(topic, "*"); } - consumer.start(); + pullConsumer.startup(); } catch (Exception exception) { logger.error("init default pull consumer exception, topic -" + topics.toString() + "-stackTrace-", exception); throw new EventBridgeException(" init rocketmq consumer failed"); } - return consumer; + return pullConsumer; } private String createGroupName(String prefix) { StringBuilder sb = new StringBuilder(); sb.append(prefix).append("-"); - sb.append(RemotingUtil.getLocalAddress()).append("-"); + sb.append(NetworkUtil.getLocalAddress()).append("-"); sb.append(UtilAll.getPid()).append("-"); sb.append(System.nanoTime()); return sb.toString().replace(".", "-"); } - private String createInstance(String servers) { - String[] serversArray = servers.split(";"); - List<String> serversList = new ArrayList<String>(); - for (String server : serversArray) { - if (!serversList.contains(server)) { - serversList.add(server); - } - } - Collections.sort(serversList); - return String.valueOf(serversList.toString().hashCode()); - } - - /** - * parse msg queue by queue json - * - * @param messageQueueStr - * @return - */ - private MessageQueue parseMessageQueueList(String messageQueueStr) { - List<String> messageQueueStrList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr); - if (CollectionUtils.isEmpty(messageQueueStrList) || messageQueueStrList.size() != 3) { - return null; - } - return new MessageQueue(messageQueueStrList.get(0), messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2))); - } - /** * MessageExt convert to connect record * @param messageExt @@ -281,11 +311,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { * @param topic */ private void subscribe(String topic) { - try { - pullConsumer.subscribe(topic, "*"); - } catch (MQClientException exception) { - logger.error("rocketmq event subscribe new topic failed, stack trace - ", exception); - } + pullConsumer.subscribe(topic); } /** diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java new file mode 100644 index 0000000..255fdfd --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ClientConfig.java @@ -0,0 +1,113 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import org.apache.rocketmq.client.AccessChannel; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; + +/** + * @Author changfeng + * @Date 2023/4/9 10:08 上午 + */ +public class ClientConfig { + private int rmqPullMessageCacheCapacity = 1000; + private int rmqPullMessageBatchNums = 20; + private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + private long consumeTimestamp = System.currentTimeMillis(); + private String nameSrvAddr; + private String namespace; + private String consumerGroup; + private int pullInterval = 0; + // All the offsets will be committed in the commit thread if enable this flag. + // To avoid too many rpc calls, disable it and rely on the inner offset automatic commit mechanism + private boolean commitSync = false; + private String routeHookEndpoint; + private AccessChannel accessChannel; + + public int getRmqPullMessageCacheCapacity() { + return rmqPullMessageCacheCapacity; + } + + public void setRmqPullMessageCacheCapacity(final int capacity) { + this.rmqPullMessageCacheCapacity = capacity; + } + + public int getRmqPullMessageBatchNums() { + return rmqPullMessageBatchNums; + } + + public void setRmqPullMessageBatchNums(final int nums) { + this.rmqPullMessageBatchNums = nums; + } + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere( + final ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + public long getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(final long consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + public String getNameSrvAddr() { + return nameSrvAddr; + } + + public void setNameSrvAddr(final String nameSrvAddr) { + this.nameSrvAddr = nameSrvAddr; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(final String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public int getPullInterval() { + return pullInterval; + } + + public void setPullInterval(final int pullInterval) { + this.pullInterval = pullInterval; + } + + public boolean isCommitSync() { + return commitSync; + } + + public void setCommitSync(final boolean commitSync) { + this.commitSync = commitSync; + } + + public String getRouteHookEndpoint() { + return routeHookEndpoint; + } + + public void setRouteHookEndpoint(final String routeHookEndpoint) { + this.routeHookEndpoint = routeHookEndpoint; + } + + public AccessChannel getAccessChannel() { + return accessChannel; + } + + public void setAccessChannel(AccessChannel accessChannel) { + this.accessChannel = accessChannel; + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java new file mode 100644 index 0000000..ac949ea --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ConsumeRequest.java @@ -0,0 +1,34 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +/** + * @Author changfeng + * @Date 2023/4/9 10:07 上午 + */ +public class ConsumeRequest { + private final MessageExt messageExt; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + + public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExt = messageExt; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java new file mode 100644 index 0000000..9ba9316 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/ExponentialRetryPolicy.java @@ -0,0 +1,69 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import com.google.common.base.MoreObjects; + +import java.util.concurrent.TimeUnit; + +/** + * @Author changfeng + * @Date 2023/4/9 10:11 上午 + */ +public class ExponentialRetryPolicy implements RetryPolicy { + private long initial = TimeUnit.SECONDS.toMillis(5); + private long max = TimeUnit.HOURS.toMillis(2); + private long multiplier = 2; + private int retryCount = 0; + + public ExponentialRetryPolicy() { + } + + public ExponentialRetryPolicy(long initial, long max, long multiplier) { + this.initial = initial; + this.max = max; + this.multiplier = multiplier; + } + + public long getInitial() { + return initial; + } + + public void setInitial(long initial) { + this.initial = initial; + } + + public long getMax() { + return max; + } + + public void setMax(long max) { + this.max = max; + } + + public long getMultiplier() { + return multiplier; + } + + public void setMultiplier(long multiplier) { + this.multiplier = multiplier; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("initial", initial) + .add("max", max) + .add("multiplier", multiplier) + .toString(); + } + + @Override + public long nextDelayDuration() { + if (retryCount < 0) { + retryCount = 0; + } + if (retryCount > 32) { + retryCount = 32; + } + return Math.min(max, initial * (long) Math.pow(multiplier, retryCount++)); + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java new file mode 100644 index 0000000..d2e2671 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumer.java @@ -0,0 +1,29 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +import java.time.Duration; +import java.util.List; + +/** + * @Author changfeng + * @Date 2023/4/9 10:09 上午 + */ +public interface LitePullConsumer { + void startup() throws MQClientException; + + void shutdown(); + + void attachTopic(String topic, String tag); + + List<MessageExt> poll(int pullBatchSize, Duration timeout); + + void commit(List<String> messageIdList); + + void setSockProxyJson(String proxyJson); + + void subscribe(String topic); + + void unsubscribe(String topic); +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java new file mode 100644 index 0000000..0f887cc --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LitePullConsumerImpl.java @@ -0,0 +1,299 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @Author changfeng + * @Date 2023/4/9 10:10 上午 + */ +public class LitePullConsumerImpl implements LitePullConsumer { + private static final Logger log = LoggerFactory.getLogger(LitePullConsumerImpl.class); + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final LocalMessageCache localMessageCache; + private final ConcurrentHashMap<MessageQueue, ExponentialRetryPolicy> retryPolicies; + private final ClientConfig clientConfig; + private final Map<MessageQueue, ProcessQueue> runningQueueMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduleService = new ScheduledThreadPoolExecutor(1, + ThreadUtils.newThreadFactory("PullConsumerScheduleService", false)); + private final ExecutorService executorService = Executors.newSingleThreadExecutor( + ThreadUtils.newThreadFactory("PullConsumerExecutorService", false)); + + public LitePullConsumerImpl(final ClientConfig clientConfig, final RPCHook rpcHook) { + this.clientConfig = clientConfig; + rocketmqPullConsumer = new DefaultMQPullConsumer(clientConfig.getConsumerGroup(), rpcHook); + rocketmqPullConsumer.setNamesrvAddr(clientConfig.getNameSrvAddr()); + rocketmqPullConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle()); + rocketmqPullConsumer.setInstanceName(buildInstanceName(clientConfig.getNameSrvAddr(), clientConfig.getConsumerGroup())); + if (clientConfig.getAccessChannel() != null) { + rocketmqPullConsumer.setAccessChannel(clientConfig.getAccessChannel()); + } + if (StringUtils.isNotBlank(clientConfig.getNamespace())) { + rocketmqPullConsumer.setNamespace(clientConfig.getNamespace()); + } + localMessageCache = new LocalMessageCache(rocketmqPullConsumer, clientConfig); + retryPolicies = new ConcurrentHashMap<>(); + } + + @Override + public void startup() throws MQClientException { + rocketmqPullConsumer.start(); + log.info("RocketmqPullConsumer start."); + } + + @Override + public void shutdown() { + rocketmqPullConsumer.shutdown(); + shutdownThreadPool(executorService); + shutdownThreadPool(scheduleService); + } + + private void shutdownThreadPool(ExecutorService executor) { + if (executor != null) { + executor.shutdown(); + try { + executor.awaitTermination(60, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Shutdown threadPool failed", e); + } + if (!executor.isTerminated()) { + executor.shutdownNow(); + } + } + } + + @Override + public void attachTopic(final String topic, final String tag) { + rocketmqPullConsumer.setRegisterTopics(new HashSet<>(Collections.singletonList(topic))); + rocketmqPullConsumer.registerMessageQueueListener(topic, new MessageQueueListener() { + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + submitPullTask(topic, tag, mqDivided); + localMessageCache.shrinkPullOffsetTable(mqDivided); + retryPolicies.entrySet().removeIf(next -> !mqDivided.contains(next.getKey())); + log.info("Load balance result of topic {} changed, mqAll {}, mqDivided {}.", topic, mqAll, mqDivided); + } + }); + } + + @Override + public void subscribe(final String topic) { + rocketmqPullConsumer.getDefaultMQPullConsumerImpl().subscriptionAutomatically(topic); + + } + + @Override + public void unsubscribe(final String topic) { + rocketmqPullConsumer.getDefaultMQPullConsumerImpl().unsubscribe(topic); + } + + @Override + public List<MessageExt> poll(final int pullBatchSize, final Duration timeout) { + return localMessageCache.poll(pullBatchSize, timeout); + } + + @Override + public void commit(final List<String> messageIdList) { + localMessageCache.commit(messageIdList); + } + + @Override + public void setSockProxyJson(final String proxyJson) { + rocketmqPullConsumer.setSocksProxyConfig(proxyJson); + } + + private String buildInstanceName(String nameSrvAddr, String consumerGroup) { + return UtilAll.getPid() + + "#" + nameSrvAddr.hashCode() + + "#" + consumerGroup + + "#" + System.nanoTime(); + } + + private RetryPolicy getRetryPolicy(MessageQueue messageQueue) { + return retryPolicies.computeIfAbsent(messageQueue, queue -> + new ExponentialRetryPolicy(Duration.ofMillis(100).toMillis(), Duration.ofMinutes(10).toMillis(), 2)); + } + + private void removeRetryPolicy(MessageQueue messageQueue) { + retryPolicies.remove(messageQueue); + } + + private void submitPullTask(String topic, String tag, Set<MessageQueue> assignedQueues) { + Set<MessageQueue> runningQueues = runningQueueMap.keySet(); + for (MessageQueue runningQueue : runningQueues) { + if (runningQueue == null || !assignedQueues.contains(runningQueue)) { + ProcessQueue processQueue = runningQueueMap.remove(runningQueue); + if (processQueue != null) { + processQueue.setDropped(true); + } + } + } + if (CollectionUtils.isEmpty(assignedQueues)) { + log.warn("Not found any messageQueue, topic:{}", topic); + return; + } + + for (MessageQueue messageQueue : assignedQueues) { + ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(messageQueue); + if (runningQueueMap.putIfAbsent(messageQueue, processQueue) == null) { + try { + PullTask pullTask = new PullTask(messageQueue, tag); + executorService.submit(pullTask); + log.info("Submit pullTask:{}", messageQueue); + } catch (Exception e) { + log.error("Failed submit pullTask:{}, {}, wait next balancing", topic, messageQueue, e); + // 添加pull失败,等待下次 rebalance + processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().remove(messageQueue); + if (processQueue != null) { + processQueue.setDropped(true); + } + runningQueueMap.remove(messageQueue); + } + } + } + + } + + void pullLater(PullTask pullTask, long delay, TimeUnit unit) { + scheduleService.schedule(new Runnable() { + @Override + public void run() { + pullTask.run(); + } + }, delay, unit); + } + + class PullTask implements Runnable { + private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; + + private final String tag; + private final MessageQueue messageQueue; + + public PullTask(MessageQueue messageQueue, String tag) { + this.messageQueue = messageQueue; + this.tag = tag; + } + + @Override + public void run() { + try { + if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) { + log.warn("RocketmqPullConsumer not running, pullTask exit."); + return; + } + ProcessQueue processQueue = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(messageQueue); + if (processQueue == null || processQueue.isDropped()) { + log.info("ProcessQueue {} dropped, pullTask exit", messageQueue); + return; + } + long offset = localMessageCache.nextPullOffset(messageQueue); + int batchNums = localMessageCache.nextPullBatchNums(); + // If batchNums is zero, an exception will be thrown and then trigger a delay + if (batchNums <= 0) { + final int delayTimeMillis = (int) getRetryPolicy(messageQueue).nextDelayDuration(); + log.warn("Local cache is full, delay the pull task {} ms for message queue {}", + delayTimeMillis, messageQueue); + pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS); + } + + rocketmqPullConsumer.pullBlockIfNotFound(this.messageQueue, this.tag, offset, batchNums, new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + try { + if (!ServiceState.RUNNING.equals(rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getServiceState())) { + log.warn("rocketmqPullConsumer not running, pullTask exit."); + return; + } + + ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(messageQueue); + switch (pullResult.getPullStatus()) { + case FOUND: + if (pq != null && !pq.isDropped()) { + pq.putMessage(pullResult.getMsgFoundList()); + for (final MessageExt messageExt : pullResult.getMsgFoundList()) { + localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, messageQueue, pq)); + } + localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + removeRetryPolicy(messageQueue); + executorService.submit(PullTask.this); + } else { + localMessageCache.removePullOffset(messageQueue); + log.info("ProcessQueue {} dropped, discard the pulled message.", messageQueue); + removeRetryPolicy(messageQueue); + } + break; + case OFFSET_ILLEGAL: + final int delayTimeMillis = (int) getRetryPolicy(messageQueue).nextDelayDuration(); + log.warn("The pull request offset is illegal, offset is {}, message queue is {}, " + + "pull result is {}, delay {} ms for next pull", + offset, messageQueue, pullResult, delayTimeMillis); + localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS); + break; + case NO_NEW_MSG: + case NO_MATCHED_MSG: + log.info("No NEW_MSG or MATCHED_MSG for mq:{}, pull again.", messageQueue); + localMessageCache.updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + removeRetryPolicy(messageQueue); + executorService.submit(PullTask.this); + break; + default: + log.warn("Failed to process pullResult, mq:{} {}", messageQueue, pullResult); + break; + } + } catch (Throwable t) { + log.error("Exception occurs when process pullResult", t); + pullLater(PullTask.this, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION, TimeUnit.MILLISECONDS); + } + } + + @Override + public void onException(Throwable e) { + final int delayTimeMillis = (int) getRetryPolicy(messageQueue).nextDelayDuration(); + log.error("Exception happens when pull message process, delay {} ms for message queue {}", + delayTimeMillis, messageQueue, e); + pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS); + } + }); + } catch (Throwable t) { + final int delayTimeMillis = (int) getRetryPolicy(messageQueue).nextDelayDuration(); + log.error("Error occurs when pull message process, delay {} ms for message queue {}", + delayTimeMillis, messageQueue, t); + pullLater(PullTask.this, delayTimeMillis, TimeUnit.MILLISECONDS); + } + } + + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java new file mode 100644 index 0000000..4068fa1 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/LocalMessageCache.java @@ -0,0 +1,157 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * @Author changfeng + * @Date 2023/4/9 10:06 上午 + */ +public class LocalMessageCache { + private static final Logger log = LoggerFactory.getLogger(LocalMessageCache.class); + private final BlockingQueue<ConsumeRequest> consumeRequestCache; + private final Map<String, ConsumeRequest> consumedRequest; + private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final ClientConfig clientConfig; + + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { + consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity()); + this.consumedRequest = new ConcurrentHashMap<>(); + this.pullOffsetTable = new ConcurrentHashMap<>(); + this.rocketmqPullConsumer = rocketmqPullConsumer; + this.clientConfig = clientConfig; + } + + int nextPullBatchNums() { + return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity()); + } + + long nextPullOffset(MessageQueue remoteQueue) { + final AtomicReference<RuntimeException> outerException = new AtomicReference<>(); + final Long existsOffset = pullOffsetTable.computeIfAbsent(remoteQueue, messageQueue -> { + try { + // Got -1 when MQBrokerException occurred, aka broker returns non-0 code, e.g. ResponseCode.QUERY_NOT_FOUND + // or any runtime exception thrown from rpc hook. + // Got -2 when other exception occurred, include broker not exists, network or client exception + long offset = rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false); + if (offset == -2) { + outerException.set(new RuntimeException("fetchConsumeOffsetFromBroker exception, please check rocketmq client for more details")); + return null; + } + if (offset == -1) { + // Follow the CONSUME_FROM_WHERE to compute next pull offset + // But note that if broker thrown any unexpected runtime exception may cause offset rollback. + // We don't handle this risk because of MetaQ doesn't have any rpc hook + final ConsumeFromWhere fromWhere = clientConfig.getConsumeFromWhere(); + switch (fromWhere) { + case CONSUME_FROM_LAST_OFFSET: + offset = this.rocketmqPullConsumer.maxOffset(remoteQueue); + break; + case CONSUME_FROM_FIRST_OFFSET: + offset = this.rocketmqPullConsumer.minOffset(remoteQueue); + break; + case CONSUME_FROM_TIMESTAMP: + offset = this.rocketmqPullConsumer.searchOffset(remoteQueue, clientConfig.getConsumeTimestamp()); + break; + } + } + if (offset >= 0) { + // Got an offset from offset store + return offset; + } + // Maybe wrong ConsumeFromWhere configured, so couldn't find any offset + return null; + } catch (Exception e) { + outerException.set(new RuntimeException(e)); + return null; + } + }); + if (outerException.get() != null) { + throw outerException.get(); + } + if (existsOffset == null) { + final String errorMsg = "[BUG] No offset found in offset store or pullOffsetTable without any exception"; + log.warn(errorMsg); + throw new RuntimeException(errorMsg); + } + return existsOffset; + } + + void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + pullOffsetTable.put(remoteQueue, nextPullOffset); + } + + void removePullOffset(MessageQueue remoteQueue) { + pullOffsetTable.remove(remoteQueue); + } + + void shrinkPullOffsetTable(Set<MessageQueue> mqDivided) { + pullOffsetTable.entrySet().removeIf(next -> !mqDivided.contains(next.getKey())); + } + + void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException ignore) { + } + } + + public List<MessageExt> poll(final int pullBatchSize, final Duration timeout) { + List<MessageExt> messageList = new ArrayList<>(); + try { + List<ConsumeRequest> consumeRequestList = new ArrayList<>(); + consumeRequestCache.drainTo(consumeRequestList, pullBatchSize); + if (consumeRequestList.size() == 0) { + final ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + consumeRequestList.add(consumeRequest); + // drainTo again + consumeRequestCache.drainTo(consumeRequestList, pullBatchSize - 1); + } + } + for (final ConsumeRequest consumeRequest : consumeRequestList) { + MessageExt messageExt = consumeRequest.getMessageExt(); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + messageList.add(messageExt); + } + } catch (InterruptedException e) { + log.warn("Poll from local cache interrupted.", e); + } + return messageList; + } + + public void commit(final List<String> messageList) { + for (final String msgId : messageList) { + ConsumeRequest consumeRequest = consumedRequest.remove(msgId); + if (consumeRequest != null) { + long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt())); + try { + rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + } + if (clientConfig.isCommitSync()) { + rocketmqPullConsumer.getDefaultMQPullConsumerImpl().persistConsumerOffset(); + } + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java new file mode 100644 index 0000000..bd31d9c --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/consumer/RetryPolicy.java @@ -0,0 +1,9 @@ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.consumer; + +/** + * @Author changfeng + * @Date 2023/4/9 10:10 上午 + */ +public interface RetryPolicy { + long nextDelayDuration(); +}
