http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java new file mode 100644 index 0000000..82c342f --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -0,0 +1,1071 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.Validators; +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.PullCallback; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.listener.MessageListener; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; +import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore; +import com.alibaba.rocketmq.client.consumer.store.OffsetStore; +import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; +import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; +import com.alibaba.rocketmq.client.hook.ConsumeMessageHook; +import com.alibaba.rocketmq.client.hook.FilterMessageHook; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.client.impl.MQClientManager; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ServiceState; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.filter.FilterAPI; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo; +import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.protocol.route.BrokerData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.sysflag.PullSysFlag; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class DefaultMQPushConsumerImpl implements MQConsumerInner { + /** + * Delay some time when exception occur + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; + /** + * Flow control interval + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + /** + * Delay some time when suspend pull service + */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; + private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; + private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; + private final Logger log = ClientLogger.getLog(); + private final DefaultMQPushConsumer defaultMQPushConsumer; + private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); + private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); + private final long consumerStartTimestamp = System.currentTimeMillis(); + private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); + private final RPCHook rpcHook; + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private PullAPIWrapper pullAPIWrapper; + private volatile boolean pause = false; + private boolean consumeOrderly = false; + private MessageListener messageListenerInner; + private OffsetStore offsetStore; + private ConsumeMessageService consumeMessageService; + private long flowControlTimes1 = 0; + private long flowControlTimes2 = 0; + + + public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { + this.defaultMQPushConsumer = defaultMQPushConsumer; + this.rpcHook = rpcHook; + } + + public void registerFilterMessageHook(final FilterMessageHook hook) { + this.filterMessageHookList.add(hook); + log.info("register FilterMessageHook Hook, {}", hook.hookName()); + } + + public boolean hasHook() { + return !this.consumeMessageHookList.isEmpty(); + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register consumeMessageHook Hook, {}", hook.hookName()); + } + + public void executeHookBefore(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + } + } + } + } + + public void executeHookAfter(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + } + } + } + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { + Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + if (null == result) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + } + + if (null == result) { + throw new MQClientException("The topic[" + topic + "] not exist", null); + } + + return result; + } + + public DefaultMQPushConsumer getDefaultMQPushConsumer() { + return defaultMQPushConsumer; + } + + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } + + public long minOffset(MessageQueue mq) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().minOffset(mq); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + public void pullMessage(final PullRequest pullRequest) { + final ProcessQueue processQueue = pullRequest.getProcessQueue(); + if (processQueue.isDropped()) { + log.info("the pull request[{}] is dropped.", pullRequest.toString()); + return; + } + + pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); + + try { + this.makeSureStateOK(); + } catch (MQClientException e) { + log.warn("pullMessage exception, consumer state not ok", e); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + return; + } + + if (this.isPause()) { + log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); + return; + } + + long size = processQueue.getMsgCount().get(); + if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((flowControlTimes1++ % 1000) == 0) { + log.warn( + "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); + } + return; + } + + if (!this.consumeOrderly) { + if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); + if ((flowControlTimes2++ % 1000) == 0) { + log.warn( + "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), + pullRequest, flowControlTimes2); + } + return; + } + } else { + if (processQueue.isLocked()) { + if (!pullRequest.isLockedFirst()) { + final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); + boolean brokerBusy = offset < pullRequest.getNextOffset(); + log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", + pullRequest, offset, brokerBusy); + if (brokerBusy) { + log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", + pullRequest, offset); + } + + pullRequest.setLockedFirst(true); + pullRequest.setNextOffset(offset); + } + } else { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + log.info("pull message later because not locked in broker, {}", pullRequest); + return; + } + } + + final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); + if (null == subscriptionData) { + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + log.warn("find the consumer's subscription failed, {}", pullRequest); + return; + } + + final long beginTimestamp = System.currentTimeMillis(); + + PullCallback pullCallback = new PullCallback() { + @Override + public void onSuccess(PullResult pullResult) { + if (pullResult != null) { + pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, + subscriptionData); + + switch (pullResult.getPullStatus()) { + case FOUND: + long prevRequestOffset = pullRequest.getNextOffset(); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + long pullRT = System.currentTimeMillis() - beginTimestamp; + DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullRT); + + long firstMsgOffset = Long.MAX_VALUE; + if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } else { + firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); + + DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), + pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); + + boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); + DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// + pullResult.getMsgFoundList(), // + processQueue, // + pullRequest.getMessageQueue(), // + dispathToConsume); + + if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, + DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); + } else { + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + } + } + + if (pullResult.getNextBeginOffset() < prevRequestOffset// + || firstMsgOffset < prevRequestOffset) { + log.warn( + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // + pullResult.getNextBeginOffset(), // + firstMsgOffset, // + prevRequestOffset); + } + + break; + case NO_NEW_MSG: + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); + + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + break; + case NO_MATCHED_MSG: + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); + + DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); + break; + case OFFSET_ILLEGAL: + log.warn("the pull request offset illegal, {} {}", // + pullRequest.toString(), pullResult.toString()); + pullRequest.setNextOffset(pullResult.getNextBeginOffset()); + + pullRequest.getProcessQueue().setDropped(true); + DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { + + @Override + public void run() { + try { + DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), + pullRequest.getNextOffset(), false); + + DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); + + DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); + + log.warn("fix the pull request offset, {}", pullRequest); + } catch (Throwable e) { + log.error("executeTaskLater Exception", e); + } + } + }, 10000); + break; + default: + break; + } + } + } + + + @Override + public void onException(Throwable e) { + if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + log.warn("execute the pull request exception", e); + } + + DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + } + }; + + boolean commitOffsetEnable = false; + long commitOffsetValue = 0L; + if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { + commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); + if (commitOffsetValue > 0) { + commitOffsetEnable = true; + } + } + + String subExpression = null; + boolean classFilter = false; + SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); + if (sd != null) { + if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { + subExpression = sd.getSubString(); + } + + classFilter = sd.isClassFilterMode(); + } + + int sysFlag = PullSysFlag.buildSysFlag(// + commitOffsetEnable, // commitOffset + true, // suspend + subExpression != null, // subscription + classFilter // class filter + ); + try { + this.pullAPIWrapper.pullKernelImpl(// + pullRequest.getMessageQueue(), // 1 + subExpression, // 2 + subscriptionData.getSubVersion(), // 3 + pullRequest.getNextOffset(), // 4 + this.defaultMQPushConsumer.getPullBatchSize(), // 5 + sysFlag, // 6 + commitOffsetValue, // 7 + BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 + CommunicationMode.ASYNC, // 10 + pullCallback// 11 + ); + } catch (Exception e) { + log.error("pullKernelImpl exception", e); + this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); + } + } + + private void makeSureStateOK() throws MQClientException { + if (this.serviceState != ServiceState.RUNNING) { + throw new MQClientException("The consumer service state not OK, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + } + } + + private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { + this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); + } + + public boolean isPause() { + return pause; + } + + public void setPause(boolean pause) { + this.pause = pause; + } + + public ConsumerStatsManager getConsumerStatsManager() { + return this.mQClientFactory.getConsumerStatsManager(); + } + + public void executePullRequestImmediately(final PullRequest pullRequest) { + this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); + } + + private void correctTagsOffset(final PullRequest pullRequest) { + if (0L == pullRequest.getProcessQueue().getMsgCount().get()) { + this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); + } + } + + public void executeTaskLater(final Runnable r, final long timeDelay) { + this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, + InterruptedException { + return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); + } + + + public void registerMessageListener(MessageListener messageListener) { + this.messageListenerInner = messageListener; + } + + public void resume() { + this.pause = false; + doRebalance(); + log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); + } + + public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, + this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); + } catch (Exception e) { + log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); + + Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); + + String originMsgId = MessageAccessor.getOriginMessageId(msg); + MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); + + newMsg.setFlag(msg.getFlag()); + MessageAccessor.setProperties(newMsg, msg.getProperties()); + MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); + MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); + MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); + + this.mQClientFactory.getDefaultMQProducer().send(newMsg); + } + } + + private int getMaxReconsumeTimes() { + // default reconsume times: 16 + if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { + return 16; + } else { + return this.defaultMQPushConsumer.getMaxReconsumeTimes(); + } + } + + public void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.consumeMessageService.shutdown(); + this.persistConsumerOffset(); + this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); + this.mQClientFactory.shutdown(); + log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); + this.rebalanceImpl.destroy(); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + public void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), + this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + this.copySubscription(); + + if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { + this.defaultMQPushConsumer.changeInstanceNameToPID(); + } + + this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); + + this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); + this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); + this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); + this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); + + this.pullAPIWrapper = new PullAPIWrapper( + mQClientFactory, + this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); + this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); + + if (this.defaultMQPushConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); + } else { + switch (this.defaultMQPushConsumer.getMessageModel()) { + case BROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + case CLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); + break; + default: + break; + } + } + this.offsetStore.load(); + + if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { + this.consumeOrderly = true; + this.consumeMessageService = + new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); + } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { + this.consumeOrderly = false; + this.consumeMessageService = + new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); + } + + this.consumeMessageService.start(); + + boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + this.consumeMessageService.shutdown(); + throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + + mQClientFactory.start(); + log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + + this.updateTopicSubscribeInfoWhenSubscriptionChanged(); + + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + + this.mQClientFactory.rebalanceImmediately(); + } + + private void checkConfig() throws MQClientException { + Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); + + if (null == this.defaultMQPushConsumer.getConsumerGroup()) { + throw new MQClientException( + "consumerGroup is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException( + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (null == this.defaultMQPushConsumer.getMessageModel()) { + throw new MQClientException( + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) { + throw new MQClientException( + "consumeFromWhere is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS); + if (null == dt) { + throw new MQClientException( + "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // allocateMessageQueueStrategy + if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) { + throw new MQClientException( + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // subscription + if (null == this.defaultMQPushConsumer.getSubscription()) { + throw new MQClientException( + "subscription is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // messageListener + if (null == this.defaultMQPushConsumer.getMessageListener()) { + throw new MQClientException( + "messageListener is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly; + boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently; + if (!orderly && !concurrently) { + throw new MQClientException( + "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeThreadMin + if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 + || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { + throw new MQClientException( + "consumeThreadMin Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeThreadMax + if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { + throw new MQClientException( + "consumeThreadMax Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeConcurrentlyMaxSpan + if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 + || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { + throw new MQClientException( + "consumeConcurrentlyMaxSpan Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullThresholdForQueue + if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) { + throw new MQClientException( + "pullThresholdForQueue Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullInterval + if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) { + throw new MQClientException( + "pullInterval Out of range [0, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // consumeMessageBatchMaxSize + if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 + || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { + throw new MQClientException( + "consumeMessageBatchMaxSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // pullBatchSize + if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) { + throw new MQClientException( + "pullBatchSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + private void copySubscription() throws MQClientException { + try { + Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); + if (sub != null) { + for (final Map.Entry<String, String> entry : sub.entrySet()) { + final String topic = entry.getKey(); + final String subString = entry.getValue(); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + topic, subString); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + } + } + + if (null == this.messageListenerInner) { + this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); + } + + switch (this.defaultMQPushConsumer.getMessageModel()) { + case BROADCASTING: + break; + case CLUSTERING: + final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + retryTopic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); + break; + default: + break; + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public MessageListener getMessageListenerInner() { + return messageListenerInner; + } + + private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } + } + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { + return this.rebalanceImpl.getSubscriptionInner(); + } + + public void subscribe(String topic, String subExpression) throws MQClientException { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + topic, "*"); + subscriptionData.setSubString(fullClassName); + subscriptionData.setClassFilterMode(true); + subscriptionData.setFilterClassSource(filterClassSource); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void suspend() { + this.pause = true; + log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); + } + + public void unsubscribe(String topic) { + this.rebalanceImpl.getSubscriptionInner().remove(topic); + } + + public void updateConsumeOffset(MessageQueue mq, long offset) { + this.offsetStore.updateOffset(mq, offset, false); + } + + public void updateCorePoolSize(int corePoolSize) { + this.consumeMessageService.updateCorePoolSize(corePoolSize); + } + + public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); + } + + public RebalanceImpl getRebalanceImpl() { + return rebalanceImpl; + } + + public boolean isConsumeOrderly() { + return consumeOrderly; + } + + public void setConsumeOrderly(boolean consumeOrderly) { + this.consumeOrderly = consumeOrderly; + } + + public void resetOffsetByTimeStamp(long timeStamp) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) { + Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic); + Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); + if (mqs != null) { + for (MessageQueue mq : mqs) { + long offset = searchOffset(mq, timeStamp); + offsetTable.put(mq, offset); + } + this.mQClientFactory.resetOffset(topic, groupName(), offsetTable); + } + } + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + @Override + public String groupName() { + return this.defaultMQPushConsumer.getConsumerGroup(); + } + + @Override + public MessageModel messageModel() { + return this.defaultMQPushConsumer.getMessageModel(); + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_PASSIVELY; + } + + @Override + public ConsumeFromWhere consumeFromWhere() { + return this.defaultMQPushConsumer.getConsumeFromWhere(); + } + + @Override + public Set<SubscriptionData> subscriptions() { + Set<SubscriptionData> subSet = new HashSet<SubscriptionData>(); + + subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); + + return subSet; + } + + @Override + public void doRebalance() { + if (this.rebalanceImpl != null && !this.pause) { + this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); + } + } + + @Override + public void persistConsumerOffset() { + try { + this.makeSureStateOK(); + Set<MessageQueue> mqs = new HashSet<MessageQueue>(); + Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + if (allocateMq != null) { + mqs.addAll(allocateMq); + } + + this.offsetStore.persistAll(mqs); + } catch (Exception e) { + log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + } + } + + @Override + public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) { + Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info); + } + } + } + + @Override + public boolean isSubscribeTopicNeedUpdate(String topic) { + Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); + } + } + + return false; + } + + @Override + public boolean isUnitMode() { + return this.defaultMQPushConsumer.isUnitMode(); + } + + @Override + public ConsumerRunningInfo consumerRunningInfo() { + ConsumerRunningInfo info = new ConsumerRunningInfo(); + + Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer); + + prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly)); + prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize())); + prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); + + info.setProperties(prop); + + Set<SubscriptionData> subSet = this.subscriptions(); + info.getSubscriptionSet().addAll(subSet); + + Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueue> next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueue pq = next.getValue(); + + ProcessQueueInfo pqinfo = new ProcessQueueInfo(); + pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE)); + pq.fillProcessQueueInfo(pqinfo); + info.getMqTable().put(mq, pqinfo); + } + + for (SubscriptionData sd : subSet) { + ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic()); + info.getStatusTable().put(sd.getTopic(), consumeStatus); + } + + return info; + } + + public MQClientInstance getmQClientFactory() { + return mQClientFactory; + } + + public void setmQClientFactory(MQClientInstance mQClientFactory) { + this.mQClientFactory = mQClientFactory; + } + + public ServiceState getServiceState() { + return serviceState; + } + + public void setServiceState(ServiceState serviceState) { + this.serviceState = serviceState; + } + + public void adjustThreadPool() { + long computeAccTotal = this.computeAccumulationTotal(); + long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold(); + + long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0); + + long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8); + + if (computeAccTotal >= incThreshold) { + this.consumeMessageService.incCorePoolSize(); + } + + if (computeAccTotal < decThreshold) { + this.consumeMessageService.decCorePoolSize(); + } + } + + private long computeAccumulationTotal() { + long msgAccTotal = 0; + ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); + Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueue> next = it.next(); + ProcessQueue value = next.getValue(); + msgAccTotal += value.getMsgAccCnt(); + } + + return msgAccTotal; + } + + public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic) + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>(); + TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000); + for (BrokerData brokerData : routeData.getBrokerDatas()) { + String addr = brokerData.selectBrokerAddr(); + queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000)); + } + + return queueTimeSpan; + } + + + public ConsumeMessageService getConsumeMessageService() { + return consumeMessageService; + } + + + public void setConsumeMessageService(ConsumeMessageService consumeMessageService) { + this.consumeMessageService = consumeMessageService; + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java new file mode 100644 index 0000000..1ff430b --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; + +import java.util.Set; + + +/** + * Consumer inner interface + * + * @author shijia.wxr + */ +public interface MQConsumerInner { + String groupName(); + + + MessageModel messageModel(); + + + ConsumeType consumeType(); + + + ConsumeFromWhere consumeFromWhere(); + + + Set<SubscriptionData> subscriptions(); + + + void doRebalance(); + + + void persistConsumerOffset(); + + + void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info); + + + boolean isSubscribeTopicNeedUpdate(final String topic); + + + boolean isUnitMode(); + + + ConsumerRunningInfo consumerRunningInfo(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java new file mode 100644 index 0000000..9de7ac0 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.common.message.MessageQueue; + +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Message lock,strictly ensure the single queue only one thread at a time consuming + * + * @author shijia.wxr + */ +public class MessageQueueLock { + private ConcurrentHashMap<MessageQueue, Object> mqLockTable = + new ConcurrentHashMap<MessageQueue, Object>(); + + + public Object fetchLockObject(final MessageQueue mq) { + Object objLock = this.mqLockTable.get(mq); + if (null == objLock) { + objLock = new Object(); + Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); + if (prevLock != null) { + objLock = prevLock; + } + } + + return objLock; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java new file mode 100644 index 0000000..05ffeb7 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java @@ -0,0 +1,451 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +/** + * Queue consumption snapshot + * + * @author shijia.wxr + */ +public class ProcessQueue { + public final static long REBALANCE_LOCK_MAX_LIVE_TIME = + Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000")); + public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000")); + private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); + private final Logger log = ClientLogger.getLog(); + private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); + private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); + private final AtomicLong msgCount = new AtomicLong(); + private final Lock lockConsume = new ReentrantLock(); + private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>(); + private final AtomicLong tryUnlockTimes = new AtomicLong(0); + private volatile long queueOffsetMax = 0L; + private volatile boolean dropped = false; + private volatile long lastPullTimestamp = System.currentTimeMillis(); + private volatile long lastConsumeTimestamp = System.currentTimeMillis(); + private volatile boolean locked = false; + private volatile long lastLockTimestamp = System.currentTimeMillis(); + private volatile boolean consuming = false; + private volatile long msgAccCnt = 0; + + public boolean isLockExpired() { + boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; + return result; + } + + + public boolean isPullExpired() { + boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; + return result; + } + + /** + + * + * @param pushConsumer + */ + public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { + if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { + return; + } + + int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + this.lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { + msg = msgTreeMap.firstEntry().getValue(); + } else { + + break; + } + } finally { + this.lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getExpiredMsg exception", e); + } + + try { + + pushConsumer.sendMessageBack(msg, 3); + log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { + try { + msgTreeMap.remove(msgTreeMap.firstKey()); + } catch (Exception e) { + log.error("send expired msg exception", e); + } + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getExpiredMsg exception", e); + } + } catch (Exception e) { + log.error("send expired msg exception", e); + } + } + } + + + public boolean putMessage(final List<MessageExt> msgs) { + boolean dispatchToConsume = false; + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + int validMsgCnt = 0; + for (MessageExt msg : msgs) { + MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); + if (null == old) { + validMsgCnt++; + this.queueOffsetMax = msg.getQueueOffset(); + } + } + msgCount.addAndGet(validMsgCnt); + + if (!msgTreeMap.isEmpty() && !this.consuming) { + dispatchToConsume = true; + this.consuming = true; + } + + if (!msgs.isEmpty()) { + MessageExt messageExt = msgs.get(msgs.size() - 1); + String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); + if (property != null) { + long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); + if (accTotal > 0) { + this.msgAccCnt = accTotal; + } + } + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("putMessage exception", e); + } + + return dispatchToConsume; + } + + + public long getMaxSpan() { + try { + this.lockTreeMap.readLock().lockInterruptibly(); + try { + if (!this.msgTreeMap.isEmpty()) { + return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey(); + } + } finally { + this.lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("getMaxSpan exception", e); + } + + return 0; + } + + + public long removeMessage(final List<MessageExt> msgs) { + long result = -1; + final long now = System.currentTimeMillis(); + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + this.lastConsumeTimestamp = now; + try { + if (!msgTreeMap.isEmpty()) { + result = this.queueOffsetMax + 1; + int removedCnt = 0; + for (MessageExt msg : msgs) { + MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); + if (prev != null) { + removedCnt--; + } + } + msgCount.addAndGet(removedCnt); + + if (!msgTreeMap.isEmpty()) { + result = msgTreeMap.firstKey(); + } + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (Throwable t) { + log.error("removeMessage exception", t); + } + + return result; + } + + + public TreeMap<Long, MessageExt> getMsgTreeMap() { + return msgTreeMap; + } + + + public AtomicLong getMsgCount() { + return msgCount; + } + + + public boolean isDropped() { + return dropped; + } + + + public void setDropped(boolean dropped) { + this.dropped = dropped; + } + + public boolean isLocked() { + return locked; + } + + public void setLocked(boolean locked) { + this.locked = locked; + } + + public void rollback() { + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + this.msgTreeMap.putAll(this.msgTreeMapTemp); + this.msgTreeMapTemp.clear(); + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("rollback exception", e); + } + } + + + public long commit() { + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + Long offset = this.msgTreeMapTemp.lastKey(); + msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1)); + this.msgTreeMapTemp.clear(); + if (offset != null) { + return offset + 1; + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("commit exception", e); + } + + return -1; + } + + + public void makeMessageToCosumeAgain(List<MessageExt> msgs) { + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + for (MessageExt msg : msgs) { + this.msgTreeMapTemp.remove(msg.getQueueOffset()); + this.msgTreeMap.put(msg.getQueueOffset(), msg); + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("makeMessageToCosumeAgain exception", e); + } + } + + + public List<MessageExt> takeMessags(final int batchSize) { + List<MessageExt> result = new ArrayList<MessageExt>(batchSize); + final long now = System.currentTimeMillis(); + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + this.lastConsumeTimestamp = now; + try { + if (!this.msgTreeMap.isEmpty()) { + for (int i = 0; i < batchSize; i++) { + Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry(); + if (entry != null) { + result.add(entry.getValue()); + msgTreeMapTemp.put(entry.getKey(), entry.getValue()); + } else { + break; + } + } + } + + if (result.isEmpty()) { + consuming = false; + } + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("take Messages exception", e); + } + + return result; + } + + + public boolean hasTempMessage() { + try { + this.lockTreeMap.readLock().lockInterruptibly(); + try { + return !this.msgTreeMap.isEmpty(); + } finally { + this.lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + } + + return true; + } + + + public void clear() { + try { + this.lockTreeMap.writeLock().lockInterruptibly(); + try { + this.msgTreeMap.clear(); + this.msgTreeMapTemp.clear(); + this.msgCount.set(0); + this.queueOffsetMax = 0L; + } finally { + this.lockTreeMap.writeLock().unlock(); + } + } catch (InterruptedException e) { + log.error("rollback exception", e); + } + } + + + public long getLastLockTimestamp() { + return lastLockTimestamp; + } + + + public void setLastLockTimestamp(long lastLockTimestamp) { + this.lastLockTimestamp = lastLockTimestamp; + } + + + public Lock getLockConsume() { + return lockConsume; + } + + + public long getLastPullTimestamp() { + return lastPullTimestamp; + } + + + public void setLastPullTimestamp(long lastPullTimestamp) { + this.lastPullTimestamp = lastPullTimestamp; + } + + + public long getMsgAccCnt() { + return msgAccCnt; + } + + + public void setMsgAccCnt(long msgAccCnt) { + this.msgAccCnt = msgAccCnt; + } + + + public long getTryUnlockTimes() { + return this.tryUnlockTimes.get(); + } + + + public void incTryUnlockTimes() { + this.tryUnlockTimes.incrementAndGet(); + } + + + public void fillProcessQueueInfo(final ProcessQueueInfo info) { + try { + this.lockTreeMap.readLock().lockInterruptibly(); + + if (!this.msgTreeMap.isEmpty()) { + info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); + info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); + info.setCachedMsgCount(this.msgTreeMap.size()); + } + + if (!this.msgTreeMapTemp.isEmpty()) { + info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey()); + info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey()); + info.setTransactionMsgCount(this.msgTreeMapTemp.size()); + } + + info.setLocked(this.locked); + info.setTryUnlockTimes(this.tryUnlockTimes.get()); + info.setLastLockTimestamp(this.lastLockTimestamp); + + info.setDroped(this.dropped); + info.setLastPullTimestamp(this.lastPullTimestamp); + info.setLastConsumeTimestamp(this.lastConsumeTimestamp); + } catch (Exception e) { + } finally { + this.lockTreeMap.readLock().unlock(); + } + } + + + public long getLastConsumeTimestamp() { + return lastConsumeTimestamp; + } + + + public void setLastConsumeTimestamp(long lastConsumeTimestamp) { + this.lastConsumeTimestamp = lastConsumeTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java new file mode 100644 index 0000000..730b090 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.PullCallback; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.PullStatus; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.hook.FilterMessageContext; +import com.alibaba.rocketmq.client.hook.FilterMessageHook; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.client.impl.FindBrokerResult; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; +import com.alibaba.rocketmq.common.sysflag.PullSysFlag; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * @author shijia.wxr + */ +public class PullAPIWrapper { + private final Logger log = ClientLogger.getLog(); + private final MQClientInstance mQClientFactory; + private final String consumerGroup; + private final boolean unitMode; + private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = + new ConcurrentHashMap<MessageQueue, AtomicLong>(32); + private volatile boolean connectBrokerByUser = false; + private volatile long defaultBrokerId = MixAll.MASTER_ID; + private Random random = new Random(System.currentTimeMillis()); + private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); + + public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) { + this.mQClientFactory = mQClientFactory; + this.consumerGroup = consumerGroup; + this.unitMode = unitMode; + } + + public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, + final SubscriptionData subscriptionData) { + PullResultExt pullResultExt = (PullResultExt) pullResult; + + this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); + if (PullStatus.FOUND == pullResult.getPullStatus()) { + ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); + List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); + + List<MessageExt> msgListFilterAgain = msgList; + if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { + msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); + for (MessageExt msg : msgList) { + if (msg.getTags() != null) { + if (subscriptionData.getTagsSet().contains(msg.getTags())) { + msgListFilterAgain.add(msg); + } + } + } + } + + if (this.hasHook()) { + FilterMessageContext filterMessageContext = new FilterMessageContext(); + filterMessageContext.setUnitMode(unitMode); + filterMessageContext.setMsgList(msgListFilterAgain); + this.executeHook(filterMessageContext); + } + + for (MessageExt msg : msgListFilterAgain) { + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, + Long.toString(pullResult.getMinOffset())); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, + Long.toString(pullResult.getMaxOffset())); + } + + pullResultExt.setMsgFoundList(msgListFilterAgain); + } + + pullResultExt.setMessageBinary(null); + + return pullResult; + } + + public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { + AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); + if (null == suggest) { + this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); + } else { + suggest.set(brokerId); + } + } + + public boolean hasHook() { + return !this.filterMessageHookList.isEmpty(); + } + + public void executeHook(final FilterMessageContext context) { + if (!this.filterMessageHookList.isEmpty()) { + for (FilterMessageHook hook : this.filterMessageHookList) { + try { + hook.filterMessage(context); + } catch (Throwable e) { + log.error("execute hook error. hookName={}", hook.hookName()); + } + } + } + } + + public PullResult pullKernelImpl( + final MessageQueue mq, + final String subExpression, + final long subVersion, + final long offset, + final int maxNums, + final int sysFlag, + final long commitOffset, + final long brokerSuspendMaxTimeMillis, + final long timeoutMillis, + final CommunicationMode communicationMode, + final PullCallback pullCallback + ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + FindBrokerResult findBrokerResult = + this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.recalculatePullFromWhichNode(mq), false); + if (null == findBrokerResult) { + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = + this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.recalculatePullFromWhichNode(mq), false); + } + + if (findBrokerResult != null) { + int sysFlagInner = sysFlag; + + if (findBrokerResult.isSlave()) { + sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); + } + + PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); + requestHeader.setConsumerGroup(this.consumerGroup); + requestHeader.setTopic(mq.getTopic()); + requestHeader.setQueueId(mq.getQueueId()); + requestHeader.setQueueOffset(offset); + requestHeader.setMaxMsgNums(maxNums); + requestHeader.setSysFlag(sysFlagInner); + requestHeader.setCommitOffset(commitOffset); + requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); + requestHeader.setSubscription(subExpression); + requestHeader.setSubVersion(subVersion); + + String brokerAddr = findBrokerResult.getBrokerAddr(); + if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { + brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); + } + + PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( + brokerAddr, + requestHeader, + timeoutMillis, + communicationMode, + pullCallback); + + return pullResult; + } + + throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); + } + + public long recalculatePullFromWhichNode(final MessageQueue mq) { + if (this.isConnectBrokerByUser()) { + return this.defaultBrokerId; + } + + AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); + if (suggest != null) { + return suggest.get(); + } + + return MixAll.MASTER_ID; + } + + private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) + throws MQClientException { + ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); + if (topicRouteTable != null) { + TopicRouteData topicRouteData = topicRouteTable.get(topic); + List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); + + if (list != null && !list.isEmpty()) { + return list.get(randomNum() % list.size()); + } + } + + throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: " + + topic, null); + } + + public boolean isConnectBrokerByUser() { + return connectBrokerByUser; + } + + public int randomNum() { + int value = random.nextInt(); + if (value < 0) { + value = Math.abs(value); + if (value < 0) + value = 0; + } + return value; + } + + public void setConnectBrokerByUser(boolean connectBrokerByUser) { + this.connectBrokerByUser = connectBrokerByUser; + + } + + public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) { + this.filterMessageHookList = filterMessageHookList; + } + + public long getDefaultBrokerId() { + return defaultBrokerId; + } + + public void setDefaultBrokerId(long defaultBrokerId) { + this.defaultBrokerId = defaultBrokerId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java new file mode 100644 index 0000000..161a039 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.ServiceThread; +import org.slf4j.Logger; + +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class PullMessageService extends ServiceThread { + private final Logger log = ClientLogger.getLog(); + private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); + private final MQClientInstance mQClientFactory; + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "PullMessageServiceScheduledThread"); + } + }); + + public PullMessageService(MQClientInstance mQClientFactory) { + this.mQClientFactory = mQClientFactory; + } + + public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { + this.scheduledExecutorService.schedule(new Runnable() { + + @Override + public void run() { + PullMessageService.this.executePullRequestImmediately(pullRequest); + } + }, timeDelay, TimeUnit.MILLISECONDS); + } + + public void executePullRequestImmediately(final PullRequest pullRequest) { + try { + this.pullRequestQueue.put(pullRequest); + } catch (InterruptedException e) { + log.error("executePullRequestImmediately pullRequestQueue.put", e); + } + } + + public void executeTaskLater(final Runnable r, final long timeDelay) { + this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); + } + + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutorService; + } + + private void pullMessage(final PullRequest pullRequest) { + final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); + if (consumer != null) { + DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; + impl.pullMessage(pullRequest); + } else { + log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); + } + } + + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + PullRequest pullRequest = this.pullRequestQueue.take(); + if (pullRequest != null) { + this.pullMessage(pullRequest); + } + } catch (InterruptedException e) { + } catch (Exception e) { + log.error("Pull Message Service Run Method exception", e); + } + } + + log.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return PullMessageService.class.getSimpleName(); + } + + +}
