Clean expired message in oms pull consumer.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/85e85123 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/85e85123 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/85e85123 Branch: refs/heads/openmessaging-impl Commit: 85e85123d9380e34a138d3081f992717e10e8292 Parents: 2e3c1b0 Author: yukon <[email protected]> Authored: Wed Apr 19 17:12:29 2017 +0800 Committer: yukon <[email protected]> Committed: Wed Apr 19 17:12:29 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/consumer/LocalMessageCache.java | 105 ++++++++++++++++++- .../rocketmq/consumer/PullConsumerImpl.java | 3 + 2 files changed, 105 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/85e85123/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 0ffd36c..9afc4c9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -18,27 +18,39 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; import java.util.Collections; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; import org.slf4j.Logger; -class LocalMessageCache { +class LocalMessageCache implements ServiceLifecycle { private final BlockingQueue<ConsumeRequest> consumeRequestCache; private final Map<String, ConsumeRequest> consumedRequest; private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; private final DefaultMQPullConsumer rocketmqPullConsumer; private final ClientConfig clientConfig; + private final ScheduledExecutorService cleanExpireMsgExecutors; + private final static Logger log = ClientLogger.getLog(); LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { @@ -47,6 +59,8 @@ class LocalMessageCache { this.pullOffsetTable = new ConcurrentHashMap<>(); this.rocketmqPullConsumer = rocketmqPullConsumer; this.clientConfig = clientConfig; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "OMS_CleanExpireMsgScheduledThread_")); } int nextPullBatchNums() { @@ -92,9 +106,11 @@ class LocalMessageCache { try { ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); if (consumeRequest != null) { + MessageExt messageExt = consumeRequest.getMessageExt(); consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + return messageExt; } } catch (InterruptedException ignore) { } @@ -112,4 +128,87 @@ class LocalMessageCache { } } } + + void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { + consumedRequest.remove(messageExt.getMsgId()); + long offset = processQueue.removeMessage(Collections.singletonList(messageExt)); + try { + rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + + @Override + public void startup() { + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpireMsg(); + } + }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES); + } + + @Override + public void shutdown() { + ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS); + } + + private void cleanExpireMsg() { + for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getProcessQueueTable().entrySet()) { + ProcessQueue pq = next.getValue(); + MessageQueue mq = next.getKey(); + ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); + if (lockTreeMap == null) { + log.error("Gets tree map lock in process queue error, may be has compatibility issue"); + return; + } + + TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap(); + + int loop = msgTreeMap.size(); + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty()) { + msg = msgTreeMap.firstEntry().getValue(); + System.out.println(msg); + if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) + > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { + //Expired, ack and remove it. + } else { + break; + } + } else { + break; + } + } finally { + lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("Gets expired message exception", e); + } + + try { + rocketmqPullConsumer.sendMessageBack(msg, 3); + log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + ack(mq, pq, msg); + } catch (Exception e) { + log.error("Send back expired msg exception", e); + } + } + } + } + + private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) { + try { + return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true); + } catch (IllegalAccessException e) { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/85e85123/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 56a49a4..5d4e7d9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -114,6 +114,7 @@ public class PullConsumerImpl implements PullConsumer { try { registerPullTaskCallback(); this.pullConsumerScheduleService.start(); + this.localMessageCache.startup(); } catch (MQClientException e) { throw new OMSRuntimeException("-1", e); } @@ -136,6 +137,7 @@ public class PullConsumerImpl implements PullConsumer { switch (pullResult.getPullStatus()) { case FOUND: if (pq != null) { + pq.putMessage(pullResult.getMsgFoundList()); for (final MessageExt messageExt : pullResult.getMsgFoundList()) { localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); } @@ -155,6 +157,7 @@ public class PullConsumerImpl implements PullConsumer { @Override public synchronized void shutdown() { if (this.started) { + this.localMessageCache.shutdown(); this.pullConsumerScheduleService.shutdown(); this.rocketmqPullConsumer.shutdown(); }
