This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch litePullConsumer in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 11b686e47797be15f187f906e4064264dfa2c993 Author: duhenglucky <[email protected]> AuthorDate: Wed Jul 17 14:33:29 2019 +0800 Add lite pull consumer example --- .../client/consumer/DefaultLiteMQPullConsumer.java | 82 ++++++++++++++++++++-- .../client/consumer/LiteMQPullConsumer.java | 3 - .../impl/consumer/LiteMQPullConsumerImpl.java | 77 +++++++++++++++++--- .../example/simple/LitePullConsumerTest.java | 49 +++++++++++++ 4 files changed, 192 insertions(+), 19 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java index 99fd0d9..96d4f5a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java @@ -27,18 +27,50 @@ import org.apache.rocketmq.remoting.RPCHook; public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer { private LiteMQPullConsumerImpl liteMQPullConsumer; + /** + * Maximum amount of time in minutes a message may block the consuming thread. + */ + private long consumeTimeout = 15; + + /** + * Is auto commit offset + */ + private boolean autoCommit = true; + + private int pullThreadNumbers = 20; + + /** + * Maximum commit offset interval time in seconds. + */ + private long autoCommitInterval = 20; + public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) { + this.setConsumerGroup(consumerGroup); this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook); } - @Override public void subscribe(String topic, String subExpression) throws MQClientException{ + public DefaultLiteMQPullConsumer(String consumerGroup) { + this.setConsumerGroup(consumerGroup); + this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, null); + } + + @Override + public void start() throws MQClientException{ + this.liteMQPullConsumer.start(); + } + + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { this.liteMQPullConsumer.subscribe(topic, subExpression); } - @Override public void unsubscribe(String topic) { + @Override + public void unsubscribe(String topic) { + this.liteMQPullConsumer.unsubscribe(topic); } - @Override public List<MessageExt> poll() { + @Override + public List<MessageExt> poll() { return poll(this.getConsumerPullTimeoutMillis()); } @@ -46,19 +78,55 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements return liteMQPullConsumer.poll(timeout); } - @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + @Override + public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + this.liteMQPullConsumer.seek(messageQueue, offset); + } + + @Override + public void pause(Collection<MessageQueue> messageQueues) { + this.liteMQPullConsumer.pause(messageQueues); + } + + @Override + public void resume(Collection<MessageQueue> messageQueues) { + this.liteMQPullConsumer.resume(messageQueues); + } + + @Override + public void commitSync() { + this.liteMQPullConsumer.commit(); + } + public long getConsumeTimeout() { + return consumeTimeout; } - @Override public void pause(Collection<MessageQueue> messageQueueCollection) { + public void setConsumeTimeout(long consumeTimeout) { + this.consumeTimeout = consumeTimeout; + } + public boolean isAutoCommit() { + return autoCommit; } - @Override public void resume(Collection<MessageQueue> partitions) { + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + public int getPullThreadNumbers() { + return pullThreadNumbers; } - @Override public void commitSync() { + public void setPullThreadNumbers(int pullThreadNumbers) { + this.pullThreadNumbers = pullThreadNumbers; + } + + public long getAutoCommitInterval() { + return autoCommitInterval; + } + public void setAutoCommitInterval(long autoCommitInterval) { + this.autoCommitInterval = autoCommitInterval; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java index 223cca0..da8d1cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java @@ -38,9 +38,6 @@ public interface LiteMQPullConsumer { */ void unsubscribe(final String topic); - /** - * @return - */ List<MessageExt> poll(); List<MessageExt> poll(long timeout); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java index abf5f47..d612286 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java @@ -27,12 +27,14 @@ import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; @@ -50,6 +52,8 @@ import org.apache.rocketmq.remoting.RPCHook; public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { private final InternalLogger log = ClientLogger.getLog(); + private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer; + private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); @@ -58,12 +62,21 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256); private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>(); - ; + + private final ScheduledExecutorService cleanExpireMsgExecutors; private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; - public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { + private ScheduledExecutorService autoCommitExecutors; + + public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { super(defaultMQPullConsumer, rpcHook); + this.defaultLiteMQPullConsumer = defaultMQPullConsumer; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "Lite_CleanExpireMsgScheduledThread_")); + this.autoCommitExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "Lite_AutoCommitScheduledThread_")); + } public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) { @@ -115,18 +128,43 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { @Override public synchronized void start() throws MQClientException { + this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); super.start(); final String group = this.defaultMQPullConsumer.getConsumerGroup(); this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( - 10, //this.pullThreadNums, + this.defaultLiteMQPullConsumer.getPullThreadNumbers(), new ThreadFactoryImpl("PullMsgThread-" + group) ); - this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpireMsg(); + } + }, this.defaultLiteMQPullConsumer.getConsumeTimeout(), this.defaultLiteMQPullConsumer.getConsumeTimeout(), TimeUnit.MINUTES); + this.autoCommitExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + if (defaultLiteMQPullConsumer.isAutoCommit()) { + commit(); + } + } + }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + + private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map<String, SubscriptionData> subTable = rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } } public List<MessageExt> poll(long timeout) { try { - ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS); if (consumeRequest != null) { List<MessageExt> messages = consumeRequest.getMessageExts(); for (MessageExt messageExt : messages) { @@ -148,6 +186,16 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { assignedMessageQueue.resume(messageQueues); } + public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + this.updatePullOffset(messageQueue, offset); + try { + updateConsumeOffset(messageQueue, offset); + } catch (MQClientException ex) { + log.error("Seek offset to remote message queue error!", ex); + throw ex; + } + } + public void unsubscribe(final String topic) { unsubscribe(topic); removePullTaskCallback(topic); @@ -270,7 +318,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { if (!msgTreeMap.isEmpty()) { msg = msgTreeMap.firstEntry().getValue(); if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) - > 10 * 60 * 1000) { + > this.defaultLiteMQPullConsumer.getConsumeTimeout() * 60 * 1000) { //Expired, ack and remove it. } else { break; @@ -316,16 +364,19 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { @Override public void run() { + System.out.println("begin pull message"); String topic = this.messageQueue.getTopic(); if (!this.isCancelled()) { if (assignedMessageQueue.isPaused(messageQueue)) { + scheduledThreadPoolExecutor.schedule(this, 1000, TimeUnit.MILLISECONDS); log.debug("Message Queue: {} has been paused!", messageQueue); return; } SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); long offset = nextPullOffset(messageQueue); + long pullDelayTimeMills = 0; try { - PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums()); + PullResult pullResult = pull(messageQueue, subscriptionData.getSubString(), offset, nextPullBatchNums()); ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue); switch (pullResult.getPullStatus()) { case FOUND: @@ -338,9 +389,17 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { break; } updatePullOffset(messageQueue, pullResult.getNextBeginOffset()); - } catch (Exception e) { + } catch (Throwable e) { + pullDelayTimeMills = 1000; + e.printStackTrace(); log.error("An error occurred in pull message process.", e); } + + if (!this.isCancelled()) { + scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS); + } else { + log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); + } } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java new file mode 100644 index 0000000..4297e4f --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.simple; + +import java.util.Arrays; +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class LitePullConsumerTest { + public static void main(String[] args) throws Exception { + DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null); + litePullConsumer.subscribe("test", null); + litePullConsumer.start(); + MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1); + int i = 0; + while (true) { + List<MessageExt> messageExts = litePullConsumer.poll(); + System.out.println("-----------"); + System.out.println(messageExts); + System.out.println("-----------"); + i++; + if (i == 3) { + System.out.println("pause"); + litePullConsumer.pause(Arrays.asList(messageQueue)); + } + if (i == 10) { + System.out.println("resume"); + litePullConsumer.resume(Arrays.asList(messageQueue)); + } + litePullConsumer.commitSync(); + } + } +}
