This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch litePullConsumer in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 2aae40f9256cb0a6a2b628f00df7dbe9718a963d Author: duhenglucky <[email protected]> AuthorDate: Thu Jun 20 23:32:33 2019 +0800 Add pull schedual service --- .../consumer/MQPullConsumerScheduleService.java | 2 +- .../client/impl/consumer/AssignedMessageQueue.java | 157 +++++++++++++ .../impl/consumer/DefaultMQPullConsumerImpl.java | 6 +- .../impl/consumer/LiteMQPullConsumerImpl.java | 255 +++++++++++++++++++++ 4 files changed, 416 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 44b864e..685f4c8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -151,7 +151,7 @@ public class MQPullConsumerScheduleService { } } - class PullTaskImpl implements Runnable { + public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java new file mode 100644 index 0000000..e9623a8 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageQueue; + +public class AssignedMessageQueue { + + private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState; + + public AssignedMessageQueue() { + assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>(); + } + + public boolean isPaused(MessageQueue messageQueue) { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (messageQueueStat != null) { + return messageQueueStat.isPaused(); + } + return false; + } + + public void pause(Collection<MessageQueue> messageQueues) { + for (MessageQueue messageQueue : messageQueues) { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (assignedMessageQueueState.get(messageQueue) != null) { + messageQueueStat.setPaused(true); + } + } + } + + public void resume(Collection<MessageQueue> messageQueueCollection) { + for (MessageQueue messageQueue : messageQueueCollection) { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (assignedMessageQueueState.get(messageQueue) != null) { + messageQueueStat.setPaused(false); + } + } + } + + public long getNextOffset(MessageQueue messageQueue) throws MQClientException { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (assignedMessageQueueState.get(messageQueue) != null) { + return messageQueueStat.getNextOffset(); + } + return -1; + } + + public void updateNextOffset(MessageQueue messageQueue, long offset) throws MQClientException { + MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue); + if (messageQueue == null) { + messageQueueStat = new MessageQueueStat(messageQueue, offset); + assignedMessageQueueState.putIfAbsent(messageQueue, messageQueueStat); + } + assignedMessageQueueState.get(messageQueue).setNextOffset(offset); + } + + public void updateAssignedMessageQueue(Set<MessageQueue> assigned) { + synchronized (this.assignedMessageQueueState) { + Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<MessageQueue, MessageQueueStat> next = it.next(); + if (!assigned.contains(next.getKey())) { + it.remove(); + } + } + + for (MessageQueue messageQueue : assigned) { + if (!this.assignedMessageQueueState.containsKey(messageQueue)) { + MessageQueueStat messageQueueStat = new MessageQueueStat(messageQueue); + this.assignedMessageQueueState.put(messageQueue, messageQueueStat); + } + } + } + } + + public void removeAssignedMessageQueue(String topic) { + synchronized (this.assignedMessageQueueState) { + Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<MessageQueue, MessageQueueStat> next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + it.remove(); + } + } + } + } + + public Map<MessageQueue, Long> getNeedCommitOffsets() { + Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>(); + Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = this.assignedMessageQueueState.entrySet(); + for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) { + map.put(entry.getKey(), entry.getValue().getNextOffset()); + } + return map; + } + + public class MessageQueueStat { + private MessageQueue messageQueue; + private boolean paused = false; + private long nextOffset = -1; + + public MessageQueueStat(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public MessageQueueStat(MessageQueue messageQueue, long nextOffset) { + this.messageQueue = messageQueue; + this.nextOffset = nextOffset; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public boolean isPaused() { + return paused; + } + + public void setPaused(boolean paused) { + this.paused = paused; + } + + public long getNextOffset() { + return nextOffset; + } + + public void setNextOffset(long nextOffset) { + this.nextOffset = nextOffset; + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 8aff14b..bc0884a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -68,16 +68,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public class DefaultMQPullConsumerImpl implements MQConsumerInner { private final InternalLogger log = ClientLogger.getLog(); - private final DefaultMQPullConsumer defaultMQPullConsumer; + protected final DefaultMQPullConsumer defaultMQPullConsumer; private final long consumerStartTimestamp = System.currentTimeMillis(); private final RPCHook rpcHook; private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private volatile ServiceState serviceState = ServiceState.CREATE_JUST; - private MQClientInstance mQClientFactory; + protected MQClientInstance mQClientFactory; private PullAPIWrapper pullAPIWrapper; private OffsetStore offsetStore; - private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); + protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this); public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { this.defaultMQPullConsumer = defaultMQPullConsumer; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java new file mode 100644 index 0000000..7332818 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; + +public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { + private final InternalLogger log = ClientLogger.getLog(); + + private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = + new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); + + private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue(); + + private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256); + + private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>(); + ; + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { + super(defaultMQPullConsumer, rpcHook); + } + + public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) { + this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue); + updatePullTask(topic, assignedMessageQueue); + } + + public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) { + Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<MessageQueue, PullTaskImpl> next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + + for (MessageQueue messageQueue : mqNewSet) { + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } + } + } + + class MessageQueueListenerImpl implements MessageQueueListener { + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + MessageModel messageModel = defaultMQPullConsumer.getMessageModel(); + switch (messageModel) { + case BROADCASTING: + updateAssignedMessageQueue(topic, mqAll); + break; + case CLUSTERING: + updateAssignedMessageQueue(topic, mqDivided); + break; + default: + break; + } + } + } + + int nextPullBatchNums() { + return Math.min(10, consumeRequestCache.remainingCapacity()); + } + + @Override + public synchronized void start() throws MQClientException { + super.start(); + final String group = this.defaultMQPullConsumer.getConsumerGroup(); + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( + 10, //this.pullThreadNums, + new ThreadFactoryImpl("PullMsgThread-" + group) + ); + this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + } + + public void subscribe(String topic, String subExpression) throws MQClientException { + try { + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(), + topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + if (this.mQClientFactory != null) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + } + } catch (Exception e) { + throw new MQClientException("subscription exception", e); + } + } + + void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + try { + assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset); + } catch (MQClientException e) { + log.error("A error occurred in update consume: {} offset process.", remoteQueue, e); + } + } + + private void addToConsumed(ConsumeRequest consumeRequest) { + synchronized (this.allConsumed) { + allConsumed.add(consumeRequest); + } + } + + void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + addToConsumed(consumeRequest); + } catch (InterruptedException ex) { + log.error("Submit consumeRequest error", ex); + } + } + + long nextPullOffset(MessageQueue remoteQueue) { + long offset = -1; + try { + offset = assignedMessageQueue.getNextOffset(remoteQueue); + if (offset == -1) { + offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false); + assignedMessageQueue.updateNextOffset(remoteQueue, offset); + } + } catch (MQClientException e) { + log.error("An error occurred in fetch consume offset process.", e); + } + return offset; + } + + public class PullTaskImpl implements Runnable { + private final MessageQueue messageQueue; + private volatile boolean cancelled = false; + + public PullTaskImpl(final MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + @Override + public void run() { + String topic = this.messageQueue.getTopic(); + if (!this.isCancelled()) { + if (assignedMessageQueue.isPaused(messageQueue)) { + log.debug("Message Queue: {} has been paused!", messageQueue); + return; + } + SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); + long offset = nextPullOffset(messageQueue); + try { + PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums()); + ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue); + switch (pullResult.getPullStatus()) { + case FOUND: + if (processQueue != null) { + processQueue.putMessage(pullResult.getMsgFoundList()); + submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue)); + } + break; + default: + break; + } + updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); + } catch (Exception e) { + log.error("An error occurred in pull message process.", e); + } + } + } + + public boolean isCancelled() { + return cancelled; + } + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + } + + public class ConsumeRequest { + private final List<MessageExt> messageExts; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + private long startConsumeTimeMillis; + + public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExts = messageExts; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public List<MessageExt> getMessageExts() { + return messageExts; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public long getStartConsumeTimeMillis() { + return startConsumeTimeMillis; + } + + public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { + this.startConsumeTimeMillis = startConsumeTimeMillis; + } + } +}
