http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java deleted file mode 100644 index 82c342f..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ /dev/null @@ -1,1071 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.QueryResult; -import com.alibaba.rocketmq.client.Validators; -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.consumer.PullCallback; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.listener.MessageListener; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; -import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; -import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; -import com.alibaba.rocketmq.client.hook.ConsumeMessageHook; -import com.alibaba.rocketmq.client.hook.FilterMessageHook; -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.MQClientManager; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.ServiceState; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.filter.FilterAPI; -import com.alibaba.rocketmq.common.help.FAQUrl; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo; -import com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.common.protocol.route.BrokerData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; -import com.alibaba.rocketmq.common.sysflag.PullSysFlag; -import com.alibaba.rocketmq.remoting.RPCHook; -import com.alibaba.rocketmq.remoting.common.RemotingHelper; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * @author shijia.wxr - */ -public class DefaultMQPushConsumerImpl implements MQConsumerInner { - /** - * Delay some time when exception occur - */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; - /** - * Flow control interval - */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; - /** - * Delay some time when suspend pull service - */ - private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; - private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; - private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; - private final Logger log = ClientLogger.getLog(); - private final DefaultMQPushConsumer defaultMQPushConsumer; - private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); - private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); - private final long consumerStartTimestamp = System.currentTimeMillis(); - private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); - private final RPCHook rpcHook; - private ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; - private PullAPIWrapper pullAPIWrapper; - private volatile boolean pause = false; - private boolean consumeOrderly = false; - private MessageListener messageListenerInner; - private OffsetStore offsetStore; - private ConsumeMessageService consumeMessageService; - private long flowControlTimes1 = 0; - private long flowControlTimes2 = 0; - - - public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { - this.defaultMQPushConsumer = defaultMQPushConsumer; - this.rpcHook = rpcHook; - } - - public void registerFilterMessageHook(final FilterMessageHook hook) { - this.filterMessageHookList.add(hook); - log.info("register FilterMessageHook Hook, {}", hook.hookName()); - } - - public boolean hasHook() { - return !this.consumeMessageHookList.isEmpty(); - } - - public void registerConsumeMessageHook(final ConsumeMessageHook hook) { - this.consumeMessageHookList.add(hook); - log.info("register consumeMessageHook Hook, {}", hook.hookName()); - } - - public void executeHookBefore(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageBefore(context); - } catch (Throwable e) { - } - } - } - } - - public void executeHookAfter(final ConsumeMessageContext context) { - if (!this.consumeMessageHookList.isEmpty()) { - for (ConsumeMessageHook hook : this.consumeMessageHookList) { - try { - hook.consumeMessageAfter(context); - } catch (Throwable e) { - } - } - } - } - - public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { - createTopic(key, newTopic, queueNum, 0); - } - - public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { - this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); - } - - public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { - Set<MessageQueue> result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); - if (null == result) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - result = this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic); - } - - if (null == result) { - throw new MQClientException("The topic[" + topic + "] not exist", null); - } - - return result; - } - - public DefaultMQPushConsumer getDefaultMQPushConsumer() { - return defaultMQPushConsumer; - } - - public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); - } - - public long maxOffset(MessageQueue mq) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } - - public long minOffset(MessageQueue mq) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().minOffset(mq); - } - - public OffsetStore getOffsetStore() { - return offsetStore; - } - - public void setOffsetStore(OffsetStore offsetStore) { - this.offsetStore = offsetStore; - } - - public void pullMessage(final PullRequest pullRequest) { - final ProcessQueue processQueue = pullRequest.getProcessQueue(); - if (processQueue.isDropped()) { - log.info("the pull request[{}] is dropped.", pullRequest.toString()); - return; - } - - pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); - - try { - this.makeSureStateOK(); - } catch (MQClientException e) { - log.warn("pullMessage exception, consumer state not ok", e); - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); - return; - } - - if (this.isPause()) { - log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); - return; - } - - long size = processQueue.getMsgCount().get(); - if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); - if ((flowControlTimes1++ % 1000) == 0) { - log.warn( - "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); - } - return; - } - - if (!this.consumeOrderly) { - if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); - if ((flowControlTimes2++ % 1000) == 0) { - log.warn( - "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), - pullRequest, flowControlTimes2); - } - return; - } - } else { - if (processQueue.isLocked()) { - if (!pullRequest.isLockedFirst()) { - final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); - boolean brokerBusy = offset < pullRequest.getNextOffset(); - log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", - pullRequest, offset, brokerBusy); - if (brokerBusy) { - log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", - pullRequest, offset); - } - - pullRequest.setLockedFirst(true); - pullRequest.setNextOffset(offset); - } - } else { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); - log.info("pull message later because not locked in broker, {}", pullRequest); - return; - } - } - - final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); - if (null == subscriptionData) { - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); - log.warn("find the consumer's subscription failed, {}", pullRequest); - return; - } - - final long beginTimestamp = System.currentTimeMillis(); - - PullCallback pullCallback = new PullCallback() { - @Override - public void onSuccess(PullResult pullResult) { - if (pullResult != null) { - pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, - subscriptionData); - - switch (pullResult.getPullStatus()) { - case FOUND: - long prevRequestOffset = pullRequest.getNextOffset(); - pullRequest.setNextOffset(pullResult.getNextBeginOffset()); - long pullRT = System.currentTimeMillis() - beginTimestamp; - DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), pullRT); - - long firstMsgOffset = Long.MAX_VALUE; - if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { - DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); - } else { - firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); - - DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); - - boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); - DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// - pullResult.getMsgFoundList(), // - processQueue, // - pullRequest.getMessageQueue(), // - dispathToConsume); - - if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { - DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, - DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); - } else { - DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); - } - } - - if (pullResult.getNextBeginOffset() < prevRequestOffset// - || firstMsgOffset < prevRequestOffset) { - log.warn( - "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // - pullResult.getNextBeginOffset(), // - firstMsgOffset, // - prevRequestOffset); - } - - break; - case NO_NEW_MSG: - pullRequest.setNextOffset(pullResult.getNextBeginOffset()); - - DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); - - DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); - break; - case NO_MATCHED_MSG: - pullRequest.setNextOffset(pullResult.getNextBeginOffset()); - - DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); - - DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); - break; - case OFFSET_ILLEGAL: - log.warn("the pull request offset illegal, {} {}", // - pullRequest.toString(), pullResult.toString()); - pullRequest.setNextOffset(pullResult.getNextBeginOffset()); - - pullRequest.getProcessQueue().setDropped(true); - DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { - - @Override - public void run() { - try { - DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), - pullRequest.getNextOffset(), false); - - DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); - - DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); - - log.warn("fix the pull request offset, {}", pullRequest); - } catch (Throwable e) { - log.error("executeTaskLater Exception", e); - } - } - }, 10000); - break; - default: - break; - } - } - } - - - @Override - public void onException(Throwable e) { - if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - log.warn("execute the pull request exception", e); - } - - DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); - } - }; - - boolean commitOffsetEnable = false; - long commitOffsetValue = 0L; - if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { - commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); - if (commitOffsetValue > 0) { - commitOffsetEnable = true; - } - } - - String subExpression = null; - boolean classFilter = false; - SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); - if (sd != null) { - if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { - subExpression = sd.getSubString(); - } - - classFilter = sd.isClassFilterMode(); - } - - int sysFlag = PullSysFlag.buildSysFlag(// - commitOffsetEnable, // commitOffset - true, // suspend - subExpression != null, // subscription - classFilter // class filter - ); - try { - this.pullAPIWrapper.pullKernelImpl(// - pullRequest.getMessageQueue(), // 1 - subExpression, // 2 - subscriptionData.getSubVersion(), // 3 - pullRequest.getNextOffset(), // 4 - this.defaultMQPushConsumer.getPullBatchSize(), // 5 - sysFlag, // 6 - commitOffsetValue, // 7 - BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 - CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 - CommunicationMode.ASYNC, // 10 - pullCallback// 11 - ); - } catch (Exception e) { - log.error("pullKernelImpl exception", e); - this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); - } - } - - private void makeSureStateOK() throws MQClientException { - if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - } - } - - private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay); - } - - public boolean isPause() { - return pause; - } - - public void setPause(boolean pause) { - this.pause = pause; - } - - public ConsumerStatsManager getConsumerStatsManager() { - return this.mQClientFactory.getConsumerStatsManager(); - } - - public void executePullRequestImmediately(final PullRequest pullRequest) { - this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); - } - - private void correctTagsOffset(final PullRequest pullRequest) { - if (0L == pullRequest.getProcessQueue().getMsgCount().get()) { - this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); - } - } - - public void executeTaskLater(final Runnable r, final long timeDelay) { - this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay); - } - - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { - return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); - } - - public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, - InterruptedException { - return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); - } - - - public void registerMessageListener(MessageListener messageListener) { - this.messageListenerInner = messageListener; - } - - public void resume() { - this.pause = false; - doRebalance(); - log.info("resume this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); - } - - public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - try { - String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, - this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); - } catch (Exception e) { - log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); - - Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); - - String originMsgId = MessageAccessor.getOriginMessageId(msg); - MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); - - newMsg.setFlag(msg.getFlag()); - MessageAccessor.setProperties(newMsg, msg.getProperties()); - MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); - MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); - MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); - newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - - this.mQClientFactory.getDefaultMQProducer().send(newMsg); - } - } - - private int getMaxReconsumeTimes() { - // default reconsume times: 16 - if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { - return 16; - } else { - return this.defaultMQPushConsumer.getMaxReconsumeTimes(); - } - } - - public void shutdown() { - switch (this.serviceState) { - case CREATE_JUST: - break; - case RUNNING: - this.consumeMessageService.shutdown(); - this.persistConsumerOffset(); - this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); - this.mQClientFactory.shutdown(); - log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); - this.rebalanceImpl.destroy(); - this.serviceState = ServiceState.SHUTDOWN_ALREADY; - break; - case SHUTDOWN_ALREADY: - break; - default: - break; - } - } - - public void start() throws MQClientException { - switch (this.serviceState) { - case CREATE_JUST: - log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), - this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); - this.serviceState = ServiceState.START_FAILED; - - this.checkConfig(); - - this.copySubscription(); - - if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { - this.defaultMQPushConsumer.changeInstanceNameToPID(); - } - - this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); - - this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); - this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); - this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); - this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); - - this.pullAPIWrapper = new PullAPIWrapper( - mQClientFactory, - this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); - this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); - - if (this.defaultMQPushConsumer.getOffsetStore() != null) { - this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); - } else { - switch (this.defaultMQPushConsumer.getMessageModel()) { - case BROADCASTING: - this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); - break; - case CLUSTERING: - this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); - break; - default: - break; - } - } - this.offsetStore.load(); - - if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { - this.consumeOrderly = true; - this.consumeMessageService = - new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); - } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { - this.consumeOrderly = false; - this.consumeMessageService = - new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); - } - - this.consumeMessageService.start(); - - boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); - if (!registerOK) { - this.serviceState = ServiceState.CREATE_JUST; - this.consumeMessageService.shutdown(); - throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); - } - - mQClientFactory.start(); - log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); - this.serviceState = ServiceState.RUNNING; - break; - case RUNNING: - case START_FAILED: - case SHUTDOWN_ALREADY: - throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); - default: - break; - } - - this.updateTopicSubscribeInfoWhenSubscriptionChanged(); - - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - - this.mQClientFactory.rebalanceImmediately(); - } - - private void checkConfig() throws MQClientException { - Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup()); - - if (null == this.defaultMQPushConsumer.getConsumerGroup()) { - throw new MQClientException( - "consumerGroup is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { - throw new MQClientException( - "consumerGroup can not equal " - + MixAll.DEFAULT_CONSUMER_GROUP - + ", please specify another one." - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - if (null == this.defaultMQPushConsumer.getMessageModel()) { - throw new MQClientException( - "messageModel is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) { - throw new MQClientException( - "consumeFromWhere is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS); - if (null == dt) { - throw new MQClientException( - "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // allocateMessageQueueStrategy - if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) { - throw new MQClientException( - "allocateMessageQueueStrategy is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // subscription - if (null == this.defaultMQPushConsumer.getSubscription()) { - throw new MQClientException( - "subscription is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // messageListener - if (null == this.defaultMQPushConsumer.getMessageListener()) { - throw new MQClientException( - "messageListener is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly; - boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently; - if (!orderly && !concurrently) { - throw new MQClientException( - "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // consumeThreadMin - if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 - || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 - || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { - throw new MQClientException( - "consumeThreadMin Out of range [1, 1000]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // consumeThreadMax - if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { - throw new MQClientException( - "consumeThreadMax Out of range [1, 1000]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // consumeConcurrentlyMaxSpan - if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 - || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { - throw new MQClientException( - "consumeConcurrentlyMaxSpan Out of range [1, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // pullThresholdForQueue - if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) { - throw new MQClientException( - "pullThresholdForQueue Out of range [1, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // pullInterval - if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) { - throw new MQClientException( - "pullInterval Out of range [0, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // consumeMessageBatchMaxSize - if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 - || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { - throw new MQClientException( - "consumeMessageBatchMaxSize Out of range [1, 1024]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - - // pullBatchSize - if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) { - throw new MQClientException( - "pullBatchSize Out of range [1, 1024]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); - } - } - - private void copySubscription() throws MQClientException { - try { - Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); - if (sub != null) { - for (final Map.Entry<String, String> entry : sub.entrySet()) { - final String topic = entry.getKey(); - final String subString = entry.getValue(); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, subString); - this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); - } - } - - if (null == this.messageListenerInner) { - this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); - } - - switch (this.defaultMQPushConsumer.getMessageModel()) { - case BROADCASTING: - break; - case CLUSTERING: - final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - retryTopic, SubscriptionData.SUB_ALL); - this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); - break; - default: - break; - } - } catch (Exception e) { - throw new MQClientException("subscription exception", e); - } - } - - public MessageListener getMessageListenerInner() { - return messageListenerInner; - } - - private void updateTopicSubscribeInfoWhenSubscriptionChanged() { - Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); - if (subTable != null) { - for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { - final String topic = entry.getKey(); - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - } - } - } - - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { - return this.rebalanceImpl.getSubscriptionInner(); - } - - public void subscribe(String topic, String subExpression) throws MQClientException { - try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, subExpression); - this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); - if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - } - } catch (Exception e) { - throw new MQClientException("subscription exception", e); - } - } - - public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { - try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, "*"); - subscriptionData.setSubString(fullClassName); - subscriptionData.setClassFilterMode(true); - subscriptionData.setFilterClassSource(filterClassSource); - this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); - if (this.mQClientFactory != null) { - this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - } - - } catch (Exception e) { - throw new MQClientException("subscription exception", e); - } - } - - public void suspend() { - this.pause = true; - log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup()); - } - - public void unsubscribe(String topic) { - this.rebalanceImpl.getSubscriptionInner().remove(topic); - } - - public void updateConsumeOffset(MessageQueue mq, long offset) { - this.offsetStore.updateOffset(mq, offset, false); - } - - public void updateCorePoolSize(int corePoolSize) { - this.consumeMessageService.updateCorePoolSize(corePoolSize); - } - - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); - } - - public RebalanceImpl getRebalanceImpl() { - return rebalanceImpl; - } - - public boolean isConsumeOrderly() { - return consumeOrderly; - } - - public void setConsumeOrderly(boolean consumeOrderly) { - this.consumeOrderly = consumeOrderly; - } - - public void resetOffsetByTimeStamp(long timeStamp) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) { - Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic); - Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); - if (mqs != null) { - for (MessageQueue mq : mqs) { - long offset = searchOffset(mq, timeStamp); - offsetTable.put(mq, offset); - } - this.mQClientFactory.resetOffset(topic, groupName(), offsetTable); - } - } - } - - public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { - return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); - } - - @Override - public String groupName() { - return this.defaultMQPushConsumer.getConsumerGroup(); - } - - @Override - public MessageModel messageModel() { - return this.defaultMQPushConsumer.getMessageModel(); - } - - @Override - public ConsumeType consumeType() { - return ConsumeType.CONSUME_PASSIVELY; - } - - @Override - public ConsumeFromWhere consumeFromWhere() { - return this.defaultMQPushConsumer.getConsumeFromWhere(); - } - - @Override - public Set<SubscriptionData> subscriptions() { - Set<SubscriptionData> subSet = new HashSet<SubscriptionData>(); - - subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); - - return subSet; - } - - @Override - public void doRebalance() { - if (this.rebalanceImpl != null && !this.pause) { - this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); - } - } - - @Override - public void persistConsumerOffset() { - try { - this.makeSureStateOK(); - Set<MessageQueue> mqs = new HashSet<MessageQueue>(); - Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } - - this.offsetStore.persistAll(mqs); - } catch (Exception e) { - log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); - } - } - - @Override - public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) { - Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); - if (subTable != null) { - if (subTable.containsKey(topic)) { - this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info); - } - } - } - - @Override - public boolean isSubscribeTopicNeedUpdate(String topic) { - Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); - if (subTable != null) { - if (subTable.containsKey(topic)) { - return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); - } - } - - return false; - } - - @Override - public boolean isUnitMode() { - return this.defaultMQPushConsumer.isUnitMode(); - } - - @Override - public ConsumerRunningInfo consumerRunningInfo() { - ConsumerRunningInfo info = new ConsumerRunningInfo(); - - Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer); - - prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly)); - prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize())); - prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); - - info.setProperties(prop); - - Set<SubscriptionData> subSet = this.subscriptions(); - info.getSubscriptionSet().addAll(subSet); - - Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueue> next = it.next(); - MessageQueue mq = next.getKey(); - ProcessQueue pq = next.getValue(); - - ProcessQueueInfo pqinfo = new ProcessQueueInfo(); - pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE)); - pq.fillProcessQueueInfo(pqinfo); - info.getMqTable().put(mq, pqinfo); - } - - for (SubscriptionData sd : subSet) { - ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic()); - info.getStatusTable().put(sd.getTopic(), consumeStatus); - } - - return info; - } - - public MQClientInstance getmQClientFactory() { - return mQClientFactory; - } - - public void setmQClientFactory(MQClientInstance mQClientFactory) { - this.mQClientFactory = mQClientFactory; - } - - public ServiceState getServiceState() { - return serviceState; - } - - public void setServiceState(ServiceState serviceState) { - this.serviceState = serviceState; - } - - public void adjustThreadPool() { - long computeAccTotal = this.computeAccumulationTotal(); - long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold(); - - long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0); - - long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8); - - if (computeAccTotal >= incThreshold) { - this.consumeMessageService.incCorePoolSize(); - } - - if (computeAccTotal < decThreshold) { - this.consumeMessageService.decCorePoolSize(); - } - } - - private long computeAccumulationTotal() { - long msgAccTotal = 0; - ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); - Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueue> next = it.next(); - ProcessQueue value = next.getValue(); - msgAccTotal += value.getMsgAccCnt(); - } - - return msgAccTotal; - } - - public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>(); - TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000); - for (BrokerData brokerData : routeData.getBrokerDatas()) { - String addr = brokerData.selectBrokerAddr(); - queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, groupName(), 3000)); - } - - return queueTimeSpan; - } - - - public ConsumeMessageService getConsumeMessageService() { - return consumeMessageService; - } - - - public void setConsumeMessageService(ConsumeMessageService consumeMessageService) { - this.consumeMessageService = consumeMessageService; - - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java deleted file mode 100644 index 1ff430b..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MQConsumerInner.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; - -import java.util.Set; - - -/** - * Consumer inner interface - * - * @author shijia.wxr - */ -public interface MQConsumerInner { - String groupName(); - - - MessageModel messageModel(); - - - ConsumeType consumeType(); - - - ConsumeFromWhere consumeFromWhere(); - - - Set<SubscriptionData> subscriptions(); - - - void doRebalance(); - - - void persistConsumerOffset(); - - - void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info); - - - boolean isSubscribeTopicNeedUpdate(final String topic); - - - boolean isUnitMode(); - - - ConsumerRunningInfo consumerRunningInfo(); -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java deleted file mode 100644 index 9de7ac0..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/MessageQueueLock.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.common.message.MessageQueue; - -import java.util.concurrent.ConcurrentHashMap; - - -/** - * Message lock,strictly ensure the single queue only one thread at a time consuming - * - * @author shijia.wxr - */ -public class MessageQueueLock { - private ConcurrentHashMap<MessageQueue, Object> mqLockTable = - new ConcurrentHashMap<MessageQueue, Object>(); - - - public Object fetchLockObject(final MessageQueue mq) { - Object objLock = this.mqLockTable.get(mq); - if (null == objLock) { - objLock = new Object(); - Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); - if (prevLock != null) { - objLock = prevLock; - } - } - - return objLock; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java deleted file mode 100644 index 05ffeb7..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ProcessQueue.java +++ /dev/null @@ -1,451 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.message.MessageAccessor; -import com.alibaba.rocketmq.common.message.MessageConst; -import com.alibaba.rocketmq.common.message.MessageExt; -import com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo; -import org.slf4j.Logger; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - - -/** - * Queue consumption snapshot - * - * @author shijia.wxr - */ -public class ProcessQueue { - public final static long REBALANCE_LOCK_MAX_LIVE_TIME = - Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000")); - public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000")); - private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); - private final Logger log = ClientLogger.getLog(); - private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); - private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); - private final AtomicLong msgCount = new AtomicLong(); - private final Lock lockConsume = new ReentrantLock(); - private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>(); - private final AtomicLong tryUnlockTimes = new AtomicLong(0); - private volatile long queueOffsetMax = 0L; - private volatile boolean dropped = false; - private volatile long lastPullTimestamp = System.currentTimeMillis(); - private volatile long lastConsumeTimestamp = System.currentTimeMillis(); - private volatile boolean locked = false; - private volatile long lastLockTimestamp = System.currentTimeMillis(); - private volatile boolean consuming = false; - private volatile long msgAccCnt = 0; - - public boolean isLockExpired() { - boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; - return result; - } - - - public boolean isPullExpired() { - boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; - return result; - } - - /** - - * - * @param pushConsumer - */ - public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { - if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { - return; - } - - int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; - for (int i = 0; i < loop; i++) { - MessageExt msg = null; - try { - this.lockTreeMap.readLock().lockInterruptibly(); - try { - if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { - msg = msgTreeMap.firstEntry().getValue(); - } else { - - break; - } - } finally { - this.lockTreeMap.readLock().unlock(); - } - } catch (InterruptedException e) { - log.error("getExpiredMsg exception", e); - } - - try { - - pushConsumer.sendMessageBack(msg, 3); - log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { - try { - msgTreeMap.remove(msgTreeMap.firstKey()); - } catch (Exception e) { - log.error("send expired msg exception", e); - } - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("getExpiredMsg exception", e); - } - } catch (Exception e) { - log.error("send expired msg exception", e); - } - } - } - - - public boolean putMessage(final List<MessageExt> msgs) { - boolean dispatchToConsume = false; - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - int validMsgCnt = 0; - for (MessageExt msg : msgs) { - MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg); - if (null == old) { - validMsgCnt++; - this.queueOffsetMax = msg.getQueueOffset(); - } - } - msgCount.addAndGet(validMsgCnt); - - if (!msgTreeMap.isEmpty() && !this.consuming) { - dispatchToConsume = true; - this.consuming = true; - } - - if (!msgs.isEmpty()) { - MessageExt messageExt = msgs.get(msgs.size() - 1); - String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET); - if (property != null) { - long accTotal = Long.parseLong(property) - messageExt.getQueueOffset(); - if (accTotal > 0) { - this.msgAccCnt = accTotal; - } - } - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("putMessage exception", e); - } - - return dispatchToConsume; - } - - - public long getMaxSpan() { - try { - this.lockTreeMap.readLock().lockInterruptibly(); - try { - if (!this.msgTreeMap.isEmpty()) { - return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey(); - } - } finally { - this.lockTreeMap.readLock().unlock(); - } - } catch (InterruptedException e) { - log.error("getMaxSpan exception", e); - } - - return 0; - } - - - public long removeMessage(final List<MessageExt> msgs) { - long result = -1; - final long now = System.currentTimeMillis(); - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - this.lastConsumeTimestamp = now; - try { - if (!msgTreeMap.isEmpty()) { - result = this.queueOffsetMax + 1; - int removedCnt = 0; - for (MessageExt msg : msgs) { - MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); - if (prev != null) { - removedCnt--; - } - } - msgCount.addAndGet(removedCnt); - - if (!msgTreeMap.isEmpty()) { - result = msgTreeMap.firstKey(); - } - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (Throwable t) { - log.error("removeMessage exception", t); - } - - return result; - } - - - public TreeMap<Long, MessageExt> getMsgTreeMap() { - return msgTreeMap; - } - - - public AtomicLong getMsgCount() { - return msgCount; - } - - - public boolean isDropped() { - return dropped; - } - - - public void setDropped(boolean dropped) { - this.dropped = dropped; - } - - public boolean isLocked() { - return locked; - } - - public void setLocked(boolean locked) { - this.locked = locked; - } - - public void rollback() { - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - this.msgTreeMap.putAll(this.msgTreeMapTemp); - this.msgTreeMapTemp.clear(); - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("rollback exception", e); - } - } - - - public long commit() { - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - Long offset = this.msgTreeMapTemp.lastKey(); - msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1)); - this.msgTreeMapTemp.clear(); - if (offset != null) { - return offset + 1; - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("commit exception", e); - } - - return -1; - } - - - public void makeMessageToCosumeAgain(List<MessageExt> msgs) { - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - for (MessageExt msg : msgs) { - this.msgTreeMapTemp.remove(msg.getQueueOffset()); - this.msgTreeMap.put(msg.getQueueOffset(), msg); - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("makeMessageToCosumeAgain exception", e); - } - } - - - public List<MessageExt> takeMessags(final int batchSize) { - List<MessageExt> result = new ArrayList<MessageExt>(batchSize); - final long now = System.currentTimeMillis(); - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - this.lastConsumeTimestamp = now; - try { - if (!this.msgTreeMap.isEmpty()) { - for (int i = 0; i < batchSize; i++) { - Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry(); - if (entry != null) { - result.add(entry.getValue()); - msgTreeMapTemp.put(entry.getKey(), entry.getValue()); - } else { - break; - } - } - } - - if (result.isEmpty()) { - consuming = false; - } - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("take Messages exception", e); - } - - return result; - } - - - public boolean hasTempMessage() { - try { - this.lockTreeMap.readLock().lockInterruptibly(); - try { - return !this.msgTreeMap.isEmpty(); - } finally { - this.lockTreeMap.readLock().unlock(); - } - } catch (InterruptedException e) { - } - - return true; - } - - - public void clear() { - try { - this.lockTreeMap.writeLock().lockInterruptibly(); - try { - this.msgTreeMap.clear(); - this.msgTreeMapTemp.clear(); - this.msgCount.set(0); - this.queueOffsetMax = 0L; - } finally { - this.lockTreeMap.writeLock().unlock(); - } - } catch (InterruptedException e) { - log.error("rollback exception", e); - } - } - - - public long getLastLockTimestamp() { - return lastLockTimestamp; - } - - - public void setLastLockTimestamp(long lastLockTimestamp) { - this.lastLockTimestamp = lastLockTimestamp; - } - - - public Lock getLockConsume() { - return lockConsume; - } - - - public long getLastPullTimestamp() { - return lastPullTimestamp; - } - - - public void setLastPullTimestamp(long lastPullTimestamp) { - this.lastPullTimestamp = lastPullTimestamp; - } - - - public long getMsgAccCnt() { - return msgAccCnt; - } - - - public void setMsgAccCnt(long msgAccCnt) { - this.msgAccCnt = msgAccCnt; - } - - - public long getTryUnlockTimes() { - return this.tryUnlockTimes.get(); - } - - - public void incTryUnlockTimes() { - this.tryUnlockTimes.incrementAndGet(); - } - - - public void fillProcessQueueInfo(final ProcessQueueInfo info) { - try { - this.lockTreeMap.readLock().lockInterruptibly(); - - if (!this.msgTreeMap.isEmpty()) { - info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); - info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); - info.setCachedMsgCount(this.msgTreeMap.size()); - } - - if (!this.msgTreeMapTemp.isEmpty()) { - info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey()); - info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey()); - info.setTransactionMsgCount(this.msgTreeMapTemp.size()); - } - - info.setLocked(this.locked); - info.setTryUnlockTimes(this.tryUnlockTimes.get()); - info.setLastLockTimestamp(this.lastLockTimestamp); - - info.setDroped(this.dropped); - info.setLastPullTimestamp(this.lastPullTimestamp); - info.setLastConsumeTimestamp(this.lastConsumeTimestamp); - } catch (Exception e) { - } finally { - this.lockTreeMap.readLock().unlock(); - } - } - - - public long getLastConsumeTimestamp() { - return lastConsumeTimestamp; - } - - - public void setLastConsumeTimestamp(long lastConsumeTimestamp) { - this.lastConsumeTimestamp = lastConsumeTimestamp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java deleted file mode 100644 index 730b090..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.PullCallback; -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.PullStatus; -import com.alibaba.rocketmq.client.exception.MQBrokerException; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.hook.FilterMessageContext; -import com.alibaba.rocketmq.client.hook.FilterMessageHook; -import com.alibaba.rocketmq.client.impl.CommunicationMode; -import com.alibaba.rocketmq.client.impl.FindBrokerResult; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.*; -import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import com.alibaba.rocketmq.common.protocol.route.TopicRouteData; -import com.alibaba.rocketmq.common.sysflag.PullSysFlag; -import com.alibaba.rocketmq.remoting.exception.RemotingException; -import org.slf4j.Logger; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * @author shijia.wxr - */ -public class PullAPIWrapper { - private final Logger log = ClientLogger.getLog(); - private final MQClientInstance mQClientFactory; - private final String consumerGroup; - private final boolean unitMode; - private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(32); - private volatile boolean connectBrokerByUser = false; - private volatile long defaultBrokerId = MixAll.MASTER_ID; - private Random random = new Random(System.currentTimeMillis()); - private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); - - public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) { - this.mQClientFactory = mQClientFactory; - this.consumerGroup = consumerGroup; - this.unitMode = unitMode; - } - - public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, - final SubscriptionData subscriptionData) { - PullResultExt pullResultExt = (PullResultExt) pullResult; - - this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); - if (PullStatus.FOUND == pullResult.getPullStatus()) { - ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); - List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); - - List<MessageExt> msgListFilterAgain = msgList; - if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { - msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); - for (MessageExt msg : msgList) { - if (msg.getTags() != null) { - if (subscriptionData.getTagsSet().contains(msg.getTags())) { - msgListFilterAgain.add(msg); - } - } - } - } - - if (this.hasHook()) { - FilterMessageContext filterMessageContext = new FilterMessageContext(); - filterMessageContext.setUnitMode(unitMode); - filterMessageContext.setMsgList(msgListFilterAgain); - this.executeHook(filterMessageContext); - } - - for (MessageExt msg : msgListFilterAgain) { - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, - Long.toString(pullResult.getMinOffset())); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, - Long.toString(pullResult.getMaxOffset())); - } - - pullResultExt.setMsgFoundList(msgListFilterAgain); - } - - pullResultExt.setMessageBinary(null); - - return pullResult; - } - - public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) { - AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); - if (null == suggest) { - this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId)); - } else { - suggest.set(brokerId); - } - } - - public boolean hasHook() { - return !this.filterMessageHookList.isEmpty(); - } - - public void executeHook(final FilterMessageContext context) { - if (!this.filterMessageHookList.isEmpty()) { - for (FilterMessageHook hook : this.filterMessageHookList) { - try { - hook.filterMessage(context); - } catch (Throwable e) { - log.error("execute hook error. hookName={}", hook.hookName()); - } - } - } - } - - public PullResult pullKernelImpl( - final MessageQueue mq, - final String subExpression, - final long subVersion, - final long offset, - final int maxNums, - final int sysFlag, - final long commitOffset, - final long brokerSuspendMaxTimeMillis, - final long timeoutMillis, - final CommunicationMode communicationMode, - final PullCallback pullCallback - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - FindBrokerResult findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), - this.recalculatePullFromWhichNode(mq), false); - if (null == findBrokerResult) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), - this.recalculatePullFromWhichNode(mq), false); - } - - if (findBrokerResult != null) { - int sysFlagInner = sysFlag; - - if (findBrokerResult.isSlave()) { - sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); - } - - PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); - requestHeader.setConsumerGroup(this.consumerGroup); - requestHeader.setTopic(mq.getTopic()); - requestHeader.setQueueId(mq.getQueueId()); - requestHeader.setQueueOffset(offset); - requestHeader.setMaxMsgNums(maxNums); - requestHeader.setSysFlag(sysFlagInner); - requestHeader.setCommitOffset(commitOffset); - requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); - requestHeader.setSubscription(subExpression); - requestHeader.setSubVersion(subVersion); - - String brokerAddr = findBrokerResult.getBrokerAddr(); - if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { - brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); - } - - PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( - brokerAddr, - requestHeader, - timeoutMillis, - communicationMode, - pullCallback); - - return pullResult; - } - - throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); - } - - public long recalculatePullFromWhichNode(final MessageQueue mq) { - if (this.isConnectBrokerByUser()) { - return this.defaultBrokerId; - } - - AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); - if (suggest != null) { - return suggest.get(); - } - - return MixAll.MASTER_ID; - } - - private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) - throws MQClientException { - ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); - if (topicRouteTable != null) { - TopicRouteData topicRouteData = topicRouteTable.get(topic); - List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); - - if (list != null && !list.isEmpty()) { - return list.get(randomNum() % list.size()); - } - } - - throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: " - + topic, null); - } - - public boolean isConnectBrokerByUser() { - return connectBrokerByUser; - } - - public int randomNum() { - int value = random.nextInt(); - if (value < 0) { - value = Math.abs(value); - if (value < 0) - value = 0; - } - return value; - } - - public void setConnectBrokerByUser(boolean connectBrokerByUser) { - this.connectBrokerByUser = connectBrokerByUser; - - } - - public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) { - this.filterMessageHookList = filterMessageHookList; - } - - public long getDefaultBrokerId() { - return defaultBrokerId; - } - - public void setDefaultBrokerId(long defaultBrokerId) { - this.defaultBrokerId = defaultBrokerId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java deleted file mode 100644 index 161a039..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullMessageService.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.ServiceThread; -import org.slf4j.Logger; - -import java.util.concurrent.*; - - -/** - * @author shijia.wxr - */ -public class PullMessageService extends ServiceThread { - private final Logger log = ClientLogger.getLog(); - private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); - private final MQClientInstance mQClientFactory; - private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "PullMessageServiceScheduledThread"); - } - }); - - public PullMessageService(MQClientInstance mQClientFactory) { - this.mQClientFactory = mQClientFactory; - } - - public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { - this.scheduledExecutorService.schedule(new Runnable() { - - @Override - public void run() { - PullMessageService.this.executePullRequestImmediately(pullRequest); - } - }, timeDelay, TimeUnit.MILLISECONDS); - } - - public void executePullRequestImmediately(final PullRequest pullRequest) { - try { - this.pullRequestQueue.put(pullRequest); - } catch (InterruptedException e) { - log.error("executePullRequestImmediately pullRequestQueue.put", e); - } - } - - public void executeTaskLater(final Runnable r, final long timeDelay) { - this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS); - } - - public ScheduledExecutorService getScheduledExecutorService() { - return scheduledExecutorService; - } - - private void pullMessage(final PullRequest pullRequest) { - final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); - if (consumer != null) { - DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; - impl.pullMessage(pullRequest); - } else { - log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); - } - } - - - @Override - public void run() { - log.info(this.getServiceName() + " service started"); - - while (!this.isStopped()) { - try { - PullRequest pullRequest = this.pullRequestQueue.take(); - if (pullRequest != null) { - this.pullMessage(pullRequest); - } - } catch (InterruptedException e) { - } catch (Exception e) { - log.error("Pull Message Service Run Method exception", e); - } - } - - log.info(this.getServiceName() + " service end"); - } - - - @Override - public String getServiceName() { - return PullMessageService.class.getSimpleName(); - } - - -}
