http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java new file mode 100644 index 0000000..19016ca --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientManager.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl; + +import com.alibaba.rocketmq.client.ClientConfig; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.remoting.RPCHook; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * @author shijia.wxr + */ +public class MQClientManager { + private static MQClientManager instance = new MQClientManager(); + private AtomicInteger factoryIndexGenerator = new AtomicInteger(); + private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = + new ConcurrentHashMap<String, MQClientInstance>(); + + + private MQClientManager() { + + } + + + public static MQClientManager getInstance() { + return instance; + } + + public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig) { + return getAndCreateMQClientInstance(clientConfig, null); + } + + public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { + String clientId = clientConfig.buildMQClientId(); + MQClientInstance instance = this.factoryTable.get(clientId); + if (null == instance) { + instance = + new MQClientInstance(clientConfig.cloneClientConfig(), + this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); + if (prev != null) { + instance = prev; + } else { + // TODO log + } + } + + return instance; + } + + public void removeClientFactory(final String clientId) { + this.factoryTable.remove(clientId); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java new file mode 100644 index 0000000..4dee764 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -0,0 +1,471 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeReturnType; +import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.message.MessageAccessor; +import com.alibaba.rocketmq.common.message.MessageConst; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.CMResult; +import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { + private static final Logger log = ClientLogger.getLog(); + private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + private final DefaultMQPushConsumer defaultMQPushConsumer; + private final MessageListenerConcurrently messageListener; + private final BlockingQueue<Runnable> consumeRequestQueue; + private final ThreadPoolExecutor consumeExecutor; + private final String consumerGroup; + + private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService cleanExpireMsgExecutors; + + + public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, + MessageListenerConcurrently messageListener) { + this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; + this.messageListener = messageListener; + + this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); + this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); + this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); + + this.consumeExecutor = new ThreadPoolExecutor(// + this.defaultMQPushConsumer.getConsumeThreadMin(), // + this.defaultMQPushConsumer.getConsumeThreadMax(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.consumeRequestQueue, // + new ThreadFactoryImpl("ConsumeMessageThread_")); + + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); + } + + + public void start() { + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + + @Override + public void run() { + cleanExpireMsg(); + } + + }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); + } + + + public void shutdown() { + this.scheduledExecutorService.shutdown(); + this.consumeExecutor.shutdown(); + this.cleanExpireMsgExecutors.shutdown(); + } + + @Override + public void updateCorePoolSize(int corePoolSize) { + if (corePoolSize > 0 // + && corePoolSize <= Short.MAX_VALUE // + && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { + this.consumeExecutor.setCorePoolSize(corePoolSize); + } + } + + @Override + public void incCorePoolSize() { + // long corePoolSize = this.consumeExecutor.getCorePoolSize(); + // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) + // { + // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() + // + 1); + // } + // + // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup: + // {}", // + // corePoolSize,// + // this.consumeExecutor.getCorePoolSize(),// + // this.consumerGroup); + } + + @Override + public void decCorePoolSize() { + // long corePoolSize = this.consumeExecutor.getCorePoolSize(); + // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin()) + // { + // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() + // - 1); + // } + // + // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup: + // {}", // + // corePoolSize,// + // this.consumeExecutor.getCorePoolSize(),// + // this.consumerGroup); + } + + @Override + public int getCorePoolSize() { + return this.consumeExecutor.getCorePoolSize(); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) { + ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); + result.setOrder(false); + result.setAutoCommit(true); + + List<MessageExt> msgs = new ArrayList<MessageExt>(); + msgs.add(msg); + MessageQueue mq = new MessageQueue(); + mq.setBrokerName(brokerName); + mq.setTopic(msg.getTopic()); + mq.setQueueId(msg.getQueueId()); + + ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(mq); + + this.resetRetryTopic(msgs); + + final long beginTime = System.currentTimeMillis(); + + log.info("consumeMessageDirectly receive new messge: {}", msg); + + try { + ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context); + if (status != null) { + switch (status) { + case CONSUME_SUCCESS: + result.setConsumeResult(CMResult.CR_SUCCESS); + break; + case RECONSUME_LATER: + result.setConsumeResult(CMResult.CR_LATER); + break; + default: + break; + } + } else { + result.setConsumeResult(CMResult.CR_RETURN_NULL); + } + } catch (Throwable e) { + result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); + result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); + + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageConcurrentlyService.this.consumerGroup, // + msgs, // + mq), e); + } + + result.setSpentTimeMills(System.currentTimeMillis() - beginTime); + + log.info("consumeMessageDirectly Result: {}", result); + + return result; + } + + @Override + public void submitConsumeRequest(// + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispatchToConsume) { + final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); + if (msgs.size() <= consumeBatchSize) { + ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); + try { + this.consumeExecutor.submit(consumeRequest); + } catch (RejectedExecutionException e) { + this.submitConsumeRequestLater(consumeRequest); + } + } else { + for (int total = 0; total < msgs.size(); ) { + List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); + for (int i = 0; i < consumeBatchSize; i++, total++) { + if (total < msgs.size()) { + msgThis.add(msgs.get(total)); + } else { + break; + } + } + + ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); + try { + this.consumeExecutor.submit(consumeRequest); + } catch (RejectedExecutionException e) { + for (; total < msgs.size(); total++) { + msgThis.add(msgs.get(total)); + } + + this.submitConsumeRequestLater(consumeRequest); + } + } + } + } + + public void resetRetryTopic(final List<MessageExt> msgs) { + final String groupTopic = MixAll.getRetryTopic(consumerGroup); + for (MessageExt msg : msgs) { + String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); + if (retryTopic != null && groupTopic.equals(msg.getTopic())) { + msg.setTopic(retryTopic); + } + } + } + + private void cleanExpireMsg() { + Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = + this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<MessageQueue, ProcessQueue> next = it.next(); + ProcessQueue pq = next.getValue(); + pq.cleanExpiredMsg(this.defaultMQPushConsumer); + } + } + + public void processConsumeResult(// + final ConsumeConcurrentlyStatus status, // + final ConsumeConcurrentlyContext context, // + final ConsumeRequest consumeRequest// + ) { + int ackIndex = context.getAckIndex(); + + if (consumeRequest.getMsgs().isEmpty()) + return; + + switch (status) { + case CONSUME_SUCCESS: + if (ackIndex >= consumeRequest.getMsgs().size()) { + ackIndex = consumeRequest.getMsgs().size() - 1; + } + int ok = ackIndex + 1; + int failed = consumeRequest.getMsgs().size() - ok; + this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); + break; + case RECONSUME_LATER: + ackIndex = -1; + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), + consumeRequest.getMsgs().size()); + break; + default: + break; + } + + switch (this.defaultMQPushConsumer.getMessageModel()) { + case BROADCASTING: + for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { + MessageExt msg = consumeRequest.getMsgs().get(i); + log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); + } + break; + case CLUSTERING: + List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); + for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { + MessageExt msg = consumeRequest.getMsgs().get(i); + boolean result = this.sendMessageBack(msg, context); + if (!result) { + msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); + msgBackFailed.add(msg); + } + } + + if (!msgBackFailed.isEmpty()) { + consumeRequest.getMsgs().removeAll(msgBackFailed); + + this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); + } + break; + default: + break; + } + + long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); + if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { + this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); + } + } + + public ConsumerStatsManager getConsumerStatsManager() { + return this.defaultMQPushConsumerImpl.getConsumerStatsManager(); + } + + public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { + int delayLevel = context.getDelayLevelWhenNextConsume(); + + try { + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); + return true; + } catch (Exception e) { + log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); + } + + return false; + } + + private void submitConsumeRequestLater(// + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue// + ) { + + this.scheduledExecutorService.schedule(new Runnable() { + + @Override + public void run() { + ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true); + } + }, 5000, TimeUnit.MILLISECONDS); + } + + private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// + ) { + + this.scheduledExecutorService.schedule(new Runnable() { + + @Override + public void run() { + ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); + } + }, 5000, TimeUnit.MILLISECONDS); + } + + class ConsumeRequest implements Runnable { + private final List<MessageExt> msgs; + private final ProcessQueue processQueue; + private final MessageQueue messageQueue; + + + public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) { + this.msgs = msgs; + this.processQueue = processQueue; + this.messageQueue = messageQueue; + } + + public List<MessageExt> getMsgs() { + return msgs; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + @Override + public void run() { + if (this.processQueue.isDropped()) { + log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); + return; + } + + MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; + ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); + ConsumeConcurrentlyStatus status = null; + + ConsumeMessageContext consumeMessageContext = null; + if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup()); + consumeMessageContext.setProps(new HashMap<String, String>()); + consumeMessageContext.setMq(messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); + } + + long beginTimestamp = System.currentTimeMillis(); + boolean hasException = false; + ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; + try { + ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); + if (msgs != null && !msgs.isEmpty()) { + for (MessageExt msg : msgs) { + MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); + } + } + status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); + } catch (Throwable e) { + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, + messageQueue); + hasException = true; + } + long consumeRT = System.currentTimeMillis() - beginTimestamp; + if (null == status) { + if (hasException) { + returnType = ConsumeReturnType.EXCEPTION; + } else { + returnType = ConsumeReturnType.RETURNNULL; + } + } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { + returnType = ConsumeReturnType.TIME_OUT; + } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { + returnType = ConsumeReturnType.FAILED; + } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { + returnType = ConsumeReturnType.SUCCESS; + } + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + if (null == status) { + log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, + messageQueue); + status = ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + + if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.setStatus(status.toString()); + consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status); + ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); + } + + ConsumeMessageConcurrentlyService.this.getConsumerStatsManager() + .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); + + if (!processQueue.isDropped()) { + ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); + } else { + log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); + } + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java new file mode 100644 index 0000000..82903b0 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; +import com.alibaba.rocketmq.client.consumer.listener.*; +import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.client.stat.ConsumerStatsManager; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ThreadFactoryImpl; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.protocol.body.CMResult; +import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.*; + + +/** + * @author shijia.wxr + */ +public class ConsumeMessageOrderlyService implements ConsumeMessageService { + private static final Logger log = ClientLogger.getLog(); + private final static long MAX_TIME_CONSUME_CONTINUOUSLY = + Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000")); + private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + private final DefaultMQPushConsumer defaultMQPushConsumer; + private final MessageListenerOrderly messageListener; + private final BlockingQueue<Runnable> consumeRequestQueue; + private final ThreadPoolExecutor consumeExecutor; + private final String consumerGroup; + private final MessageQueueLock messageQueueLock = new MessageQueueLock(); + private final ScheduledExecutorService scheduledExecutorService; + private volatile boolean stopped = false; + + + public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) { + this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; + this.messageListener = messageListener; + + this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); + this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); + this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); + + this.consumeExecutor = new ThreadPoolExecutor(// + this.defaultMQPushConsumer.getConsumeThreadMin(), // + this.defaultMQPushConsumer.getConsumeThreadMax(), // + 1000 * 60, // + TimeUnit.MILLISECONDS, // + this.consumeRequestQueue, // + new ThreadFactoryImpl("ConsumeMessageThread_")); + + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); + } + + + public void start() { + if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + ConsumeMessageOrderlyService.this.lockMQPeriodically(); + } + }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); + } + } + + + public void shutdown() { + this.stopped = true; + this.scheduledExecutorService.shutdown(); + this.consumeExecutor.shutdown(); + if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { + this.unlockAllMQ(); + } + } + + + public synchronized void unlockAllMQ() { + this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false); + } + + @Override + public void updateCorePoolSize(int corePoolSize) { + if (corePoolSize > 0 // + && corePoolSize <= Short.MAX_VALUE // + && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { + this.consumeExecutor.setCorePoolSize(corePoolSize); + } + } + + @Override + public void incCorePoolSize() { + } + + @Override + public void decCorePoolSize() { + } + + @Override + public int getCorePoolSize() { + return this.consumeExecutor.getCorePoolSize(); + } + + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String brokerName) { + ConsumeMessageDirectlyResult result = new ConsumeMessageDirectlyResult(); + result.setOrder(true); + + List<MessageExt> msgs = new ArrayList<MessageExt>(); + msgs.add(msg); + MessageQueue mq = new MessageQueue(); + mq.setBrokerName(brokerName); + mq.setTopic(msg.getTopic()); + mq.setQueueId(msg.getQueueId()); + + ConsumeOrderlyContext context = new ConsumeOrderlyContext(mq); + + final long beginTime = System.currentTimeMillis(); + + log.info("consumeMessageDirectly receive new messge: {}", msg); + + try { + ConsumeOrderlyStatus status = this.messageListener.consumeMessage(msgs, context); + if (status != null) { + switch (status) { + case COMMIT: + result.setConsumeResult(CMResult.CR_COMMIT); + break; + case ROLLBACK: + result.setConsumeResult(CMResult.CR_ROLLBACK); + break; + case SUCCESS: + result.setConsumeResult(CMResult.CR_SUCCESS); + break; + case SUSPEND_CURRENT_QUEUE_A_MOMENT: + result.setConsumeResult(CMResult.CR_LATER); + break; + default: + break; + } + } else { + result.setConsumeResult(CMResult.CR_RETURN_NULL); + } + } catch (Throwable e) { + result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); + result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); + + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + mq), e); + } + + result.setAutoCommit(context.isAutoCommit()); + result.setSpentTimeMills(System.currentTimeMillis() - beginTime); + + log.info("consumeMessageDirectly Result: {}", result); + + return result; + } + + @Override + public void submitConsumeRequest(// + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispathToConsume) { + if (dispathToConsume) { + ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); + this.consumeExecutor.submit(consumeRequest); + } + } + + public synchronized void lockMQPeriodically() { + if (!this.stopped) { + this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); + } + } + + public void tryLockLaterAndReconsume(final MessageQueue mq, final ProcessQueue processQueue, final long delayMills) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + boolean lockOK = ConsumeMessageOrderlyService.this.lockOneMQ(mq); + if (lockOK) { + ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 10); + } else { + ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, mq, 3000); + } + } + }, delayMills, TimeUnit.MILLISECONDS); + } + + public synchronized boolean lockOneMQ(final MessageQueue mq) { + if (!this.stopped) { + return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(mq); + } + + return false; + } + + private void submitConsumeRequestLater(// + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final long suspendTimeMillis// + ) { + long timeMillis = suspendTimeMillis; + if (timeMillis == -1) { + timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); + } + + if (timeMillis < 10) { + timeMillis = 10; + } else if (timeMillis > 30000) { + timeMillis = 30000; + } + + this.scheduledExecutorService.schedule(new Runnable() { + + @Override + public void run() { + ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true); + } + }, timeMillis, TimeUnit.MILLISECONDS); + } + + public boolean processConsumeResult(// + final List<MessageExt> msgs, // + final ConsumeOrderlyStatus status, // + final ConsumeOrderlyContext context, // + final ConsumeRequest consumeRequest// + ) { + boolean continueConsume = true; + long commitOffset = -1L; + if (context.isAutoCommit()) { + switch (status) { + case COMMIT: + case ROLLBACK: + log.warn("the message queue consume result is illegal, we think you want to ack these message {}", + consumeRequest.getMessageQueue()); + case SUCCESS: + commitOffset = consumeRequest.getProcessQueue().commit(); + this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); + break; + case SUSPEND_CURRENT_QUEUE_A_MOMENT: + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); + if (checkReconsumeTimes(msgs)) { + consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); + this.submitConsumeRequestLater(// + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); + continueConsume = false; + } else { + commitOffset = consumeRequest.getProcessQueue().commit(); + } + break; + default: + break; + } + } else { + switch (status) { + case SUCCESS: + this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); + break; + case COMMIT: + commitOffset = consumeRequest.getProcessQueue().commit(); + break; + case ROLLBACK: + consumeRequest.getProcessQueue().rollback(); + this.submitConsumeRequestLater(// + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); + continueConsume = false; + break; + case SUSPEND_CURRENT_QUEUE_A_MOMENT: + this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); + if (checkReconsumeTimes(msgs)) { + consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); + this.submitConsumeRequestLater(// + consumeRequest.getProcessQueue(), // + consumeRequest.getMessageQueue(), // + context.getSuspendCurrentQueueTimeMillis()); + continueConsume = false; + } + break; + default: + break; + } + } + + if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { + this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); + } + + return continueConsume; + } + + public ConsumerStatsManager getConsumerStatsManager() { + return this.defaultMQPushConsumerImpl.getConsumerStatsManager(); + } + + private int getMaxReconsumeTimes() { + // default reconsume times: Integer.MAX_VALUE + if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) { + return Integer.MAX_VALUE; + } else { + return this.defaultMQPushConsumer.getMaxReconsumeTimes(); + } + } + + private boolean checkReconsumeTimes(List<MessageExt> msgs) { + boolean suspend = false; + if (msgs != null && !msgs.isEmpty()) { + for (MessageExt msg : msgs) { + if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) { + MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes())); + if (!sendMessageBack(msg)) { + suspend = true; + msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); + } + } else { + suspend = true; + msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); + } + } + } + return suspend; + } + + public boolean sendMessageBack(final MessageExt msg) { + try { + // max reconsume times exceeded then send to dead letter queue. + Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); + String originMsgId = MessageAccessor.getOriginMessageId(msg); + MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); + newMsg.setFlag(msg.getFlag()); + MessageAccessor.setProperties(newMsg, msg.getProperties()); + MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); + MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes())); + MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); + + this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg); + return true; + } catch (Exception e) { + log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); + } + + return false; + } + + class ConsumeRequest implements Runnable { + private final ProcessQueue processQueue; + private final MessageQueue messageQueue; + + + public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) { + this.processQueue = processQueue; + this.messageQueue = messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + @Override + public void run() { + if (this.processQueue.isDropped()) { + log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); + return; + } + + final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); + synchronized (objLock) { + if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) + || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { + final long beginTime = System.currentTimeMillis(); + for (boolean continueConsume = true; continueConsume; ) { + if (this.processQueue.isDropped()) { + log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); + break; + } + + if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) + && !this.processQueue.isLocked()) { + log.warn("the message queue not locked, so consume later, {}", this.messageQueue); + ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); + break; + } + + if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) + && this.processQueue.isLockExpired()) { + log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); + ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); + break; + } + + long interval = System.currentTimeMillis() - beginTime; + if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { + ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); + break; + } + + final int consumeBatchSize = + ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); + + List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); + if (!msgs.isEmpty()) { + final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); + + ConsumeOrderlyStatus status = null; + + ConsumeMessageContext consumeMessageContext = null; + if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext + .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); + consumeMessageContext.setMq(messageQueue); + consumeMessageContext.setMsgList(msgs); + consumeMessageContext.setSuccess(false); + // init the consume context type + consumeMessageContext.setProps(new HashMap<String, String>()); + ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); + } + + long beginTimestamp = System.currentTimeMillis(); + ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; + boolean hasException = false; + try { + this.processQueue.getLockConsume().lock(); + if (this.processQueue.isDropped()) { + log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", + this.messageQueue); + break; + } + + status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); + } catch (Throwable e) { + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // + RemotingHelper.exceptionSimpleDesc(e), // + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + messageQueue); + hasException = true; + } finally { + this.processQueue.getLockConsume().unlock(); + } + + if (null == status // + || ConsumeOrderlyStatus.ROLLBACK == status// + || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { + log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // + ConsumeMessageOrderlyService.this.consumerGroup, // + msgs, // + messageQueue); + } + + long consumeRT = System.currentTimeMillis() - beginTimestamp; + if (null == status) { + if (hasException) { + returnType = ConsumeReturnType.EXCEPTION; + } else { + returnType = ConsumeReturnType.RETURNNULL; + } + } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { + returnType = ConsumeReturnType.TIME_OUT; + } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { + returnType = ConsumeReturnType.FAILED; + } else if (ConsumeOrderlyStatus.SUCCESS == status) { + returnType = ConsumeReturnType.SUCCESS; + } + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + if (null == status) { + status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + + if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.setStatus(status.toString()); + consumeMessageContext + .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); + ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); + } + + ConsumeMessageOrderlyService.this.getConsumerStatsManager() + .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); + + continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); + } else { + continueConsume = false; + } + } + } else { + if (this.processQueue.isDropped()) { + log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); + return; + } + + ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); + } + } + } + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java new file mode 100644 index 0000000..1f7f0d9 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public interface ConsumeMessageService { + void start(); + + + void shutdown(); + + + void updateCorePoolSize(int corePoolSize); + + + void incCorePoolSize(); + + + void decCorePoolSize(); + + + int getCorePoolSize(); + + + ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); + + + void submitConsumeRequest(// + final List<MessageExt> msgs, // + final ProcessQueue processQueue, // + final MessageQueue messageQueue, // + final boolean dispathToConsume); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java new file mode 100644 index 0000000..1785ec9 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -0,0 +1,706 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.QueryResult; +import com.alibaba.rocketmq.client.Validators; +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.PullCallback; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore; +import com.alibaba.rocketmq.client.consumer.store.OffsetStore; +import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; +import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.hook.ConsumeMessageContext; +import com.alibaba.rocketmq.client.hook.ConsumeMessageHook; +import com.alibaba.rocketmq.client.hook.FilterMessageHook; +import com.alibaba.rocketmq.client.impl.CommunicationMode; +import com.alibaba.rocketmq.client.impl.MQClientManager; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.ServiceState; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.filter.FilterAPI; +import com.alibaba.rocketmq.common.help.FAQUrl; +import com.alibaba.rocketmq.common.message.*; +import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import com.alibaba.rocketmq.common.sysflag.PullSysFlag; +import com.alibaba.rocketmq.remoting.RPCHook; +import com.alibaba.rocketmq.remoting.common.RemotingHelper; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * @author shijia.wxr + */ +public class DefaultMQPullConsumerImpl implements MQConsumerInner { + private final Logger log = ClientLogger.getLog(); + private final DefaultMQPullConsumer defaultMQPullConsumer; + private final long consumerStartTimestamp = System.currentTimeMillis(); + private final RPCHook rpcHook; + private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); + private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); + private ServiceState serviceState = ServiceState.CREATE_JUST; + private MQClientInstance mQClientFactory; + private PullAPIWrapper pullAPIWrapper; + private OffsetStore offsetStore; + private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); + + + public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { + this.defaultMQPullConsumer = defaultMQPullConsumer; + this.rpcHook = rpcHook; + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register consumeMessageHook Hook, {}", hook.hookName()); + } + + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.makeSureStateOK(); + this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag); + } + + private void makeSureStateOK() throws MQClientException { + if (this.serviceState != ServiceState.RUNNING) { + throw new MQClientException("The consumer service state not OK, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + } + } + + public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { + this.makeSureStateOK(); + return this.offsetStore.readOffset(mq, fromStore ? ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE); + } + + public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException { + this.makeSureStateOK(); + if (null == topic) { + throw new IllegalArgumentException("topic is null"); + } + + ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable(); + Set<MessageQueue> mqResult = new HashSet<MessageQueue>(); + for (MessageQueue mq : mqTable.keySet()) { + if (mq.getTopic().equals(topic)) { + mqResult.add(mq); + } + } + + return mqResult; + } + + public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic); + } + + public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + } + + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq); + } + + public long maxOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } + + public long minOffset(MessageQueue mq) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().minOffset(mq); + } + + public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); + } + + private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + this.makeSureStateOK(); + + if (null == mq) { + throw new MQClientException("mq is null", null); + + } + + if (offset < 0) { + throw new MQClientException("offset < 0", null); + } + + if (maxNums <= 0) { + throw new MQClientException("maxNums <= 0", null); + } + + this.subscriptionAutomatically(mq.getTopic()); + + int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); + + SubscriptionData subscriptionData; + try { + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + mq.getTopic(), subExpression); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + + long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + + PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(// + mq, // 1 + subscriptionData.getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + CommunicationMode.SYNC, // 10 + null// 11 + ); + this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + if (!this.consumeMessageHookList.isEmpty()) { + ConsumeMessageContext consumeMessageContext = null; + consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setConsumerGroup(this.groupName()); + consumeMessageContext.setMq(mq); + consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); + consumeMessageContext.setSuccess(false); + this.executeHookBefore(consumeMessageContext); + consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); + consumeMessageContext.setSuccess(true); + this.executeHookAfter(consumeMessageContext); + } + return pullResult; + } + + public void subscriptionAutomatically(final String topic) { + if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + topic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); + } catch (Exception e) { + } + } + } + + public void unsubscribe(String topic) { + this.rebalanceImpl.getSubscriptionInner().remove(topic); + } + + @Override + public String groupName() { + return this.defaultMQPullConsumer.getConsumerGroup(); + } + + public void executeHookBefore(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + } + } + } + } + + public void executeHookAfter(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + } + } + } + } + + @Override + public MessageModel messageModel() { + return this.defaultMQPullConsumer.getMessageModel(); + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_ACTIVELY; + } + + @Override + public ConsumeFromWhere consumeFromWhere() { + return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + } + + @Override + public Set<SubscriptionData> subscriptions() { + Set<SubscriptionData> result = new HashSet<SubscriptionData>(); + + Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics(); + if (topics != null) { + synchronized (topics) { + for (String t : topics) { + SubscriptionData ms = null; + try { + ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL); + } catch (Exception e) { + log.error("parse subscription error", e); + } + ms.setSubVersion(0L); + result.add(ms); + } + } + } + + return result; + } + + @Override + public void doRebalance() { + if (this.rebalanceImpl != null) { + this.rebalanceImpl.doRebalance(false); + } + } + + @Override + public void persistConsumerOffset() { + try { + this.makeSureStateOK(); + Set<MessageQueue> mqs = new HashSet<MessageQueue>(); + Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + if (allocateMq != null) { + mqs.addAll(allocateMq); + } + this.offsetStore.persistAll(mqs); + } catch (Exception e) { + log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + } + } + + @Override + public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) { + Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info); + } + } + } + + @Override + public boolean isSubscribeTopicNeedUpdate(String topic) { + Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); + } + } + + return false; + } + + @Override + public boolean isUnitMode() { + return this.defaultMQPullConsumer.isUnitMode(); + } + + @Override + public ConsumerRunningInfo consumerRunningInfo() { + ConsumerRunningInfo info = new ConsumerRunningInfo(); + + Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer); + prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp)); + info.setProperties(prop); + + info.getSubscriptionSet().addAll(this.subscriptions()); + return info; + } + + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); + } + + private void pullAsyncImpl(// + final MessageQueue mq, // + final String subExpression, // + final long offset, // + final int maxNums, // + final PullCallback pullCallback, // + final boolean block, // + final long timeout) throws MQClientException, RemotingException, InterruptedException { + this.makeSureStateOK(); + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + if (offset < 0) { + throw new MQClientException("offset < 0", null); + } + + if (maxNums <= 0) { + throw new MQClientException("maxNums <= 0", null); + } + + if (null == pullCallback) { + throw new MQClientException("pullCallback is null", null); + } + + this.subscriptionAutomatically(mq.getTopic()); + + try { + int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); + + final SubscriptionData subscriptionData; + try { + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + mq.getTopic(), subExpression); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + + long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + + this.pullAPIWrapper.pullKernelImpl(// + mq, // 1 + subscriptionData.getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + CommunicationMode.ASYNC, // 10 + new PullCallback() { + + @Override + public void onSuccess(PullResult pullResult) { + pullCallback + .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData)); + } + + @Override + public void onException(Throwable e) { + pullCallback.onException(e); + } + }); + } catch (MQBrokerException e) { + throw new MQClientException("pullAsync unknow exception", e); + } + } + + public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public DefaultMQPullConsumer getDefaultMQPullConsumer() { + return defaultMQPullConsumer; + } + + public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, + this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + } + + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); + } + + public MessageExt queryMessageByUniqKey(String topic, String uniqKey) + throws MQClientException, InterruptedException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup()); + } + + public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException { + this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + } + + public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + + if (UtilAll.isBlank(consumerGroup)) { + consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); + } + + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, + this.defaultMQPullConsumer.getMaxReconsumeTimes()); + } catch (Exception e) { + log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); + + Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody()); + String originMsgId = MessageAccessor.getOriginMessageId(msg); + MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); + newMsg.setFlag(msg.getFlag()); + MessageAccessor.setProperties(newMsg, msg.getProperties()); + MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); + MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); + MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); + this.mQClientFactory.getDefaultMQProducer().send(newMsg); + } + } + + public void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + this.persistConsumerOffset(); + this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup()); + this.mQClientFactory.shutdown(); + log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup()); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } + + public void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + this.copySubscription(); + + if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { + this.defaultMQPullConsumer.changeInstanceNameToPID(); + } + + this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); + + this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); + this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); + this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); + this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); + + this.pullAPIWrapper = new PullAPIWrapper(// + mQClientFactory, // + this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); + this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); + + if (this.defaultMQPullConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); + } else { + switch (this.defaultMQPullConsumer.getMessageModel()) { + case BROADCASTING: + this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); + break; + case CLUSTERING: + this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); + break; + default: + break; + } + } + + this.offsetStore.load(); + + boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + + throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + + mQClientFactory.start(); + log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); + this.serviceState = ServiceState.RUNNING; + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException("The PullConsumer service state not OK, maybe started once, "// + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + } + + private void checkConfig() throws MQClientException { + // check consumerGroup + Validators.checkGroup(this.defaultMQPullConsumer.getConsumerGroup()); + + // consumerGroup + if (null == this.defaultMQPullConsumer.getConsumerGroup()) { + throw new MQClientException( + "consumerGroup is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); + } + + // consumerGroup + if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException( + "consumerGroup can not equal "// + + MixAll.DEFAULT_CONSUMER_GROUP // + + ", please specify another one."// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); + } + + // messageModel + if (null == this.defaultMQPullConsumer.getMessageModel()) { + throw new MQClientException( + "messageModel is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); + } + + // allocateMessageQueueStrategy + if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) { + throw new MQClientException( + "allocateMessageQueueStrategy is null" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); + } + + // allocateMessageQueueStrategy + if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) { + throw new MQClientException( + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + null); + } + } + + private void copySubscription() throws MQClientException { + try { + Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); + if (registerTopics != null) { + for (final String topic : registerTopics) { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + topic, SubscriptionData.SUB_ALL); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + } + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { + this.makeSureStateOK(); + this.offsetStore.updateOffset(mq, offset, false); + } + + public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.makeSureStateOK(); + return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId); + } + + public void registerFilterMessageHook(final FilterMessageHook hook) { + this.filterMessageHookList.add(hook); + log.info("register FilterMessageHook Hook, {}", hook.hookName()); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + public PullAPIWrapper getPullAPIWrapper() { + return pullAPIWrapper; + } + + public void setPullAPIWrapper(PullAPIWrapper pullAPIWrapper) { + this.pullAPIWrapper = pullAPIWrapper; + } + + public ServiceState getServiceState() { + return serviceState; + } + + public void setServiceState(ServiceState serviceState) { + this.serviceState = serviceState; + } + + public long getConsumerStartTimestamp() { + return consumerStartTimestamp; + } + + + public RebalanceImpl getRebalanceImpl() { + return rebalanceImpl; + } +}
