This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch litePullConsumer in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit af04557abf12ce36245d7e362925125cd25e0d3c Author: duhenglucky <[email protected]> AuthorDate: Fri Jun 21 09:47:42 2019 +0800 Add pull task logic --- .../client/consumer/DefaultLiteMQPullConsumer.java | 64 ++++++++++ .../client/consumer/DefaultMQPullConsumer.java | 2 + .../client/consumer/LiteMQPullConsumer.java | 55 ++++++++ .../rocketmq/client/consumer/MQPullConsumer.java | 3 + .../impl/consumer/LiteMQPullConsumerImpl.java | 140 ++++++++++++++++++++- 5 files changed, 263 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java new file mode 100644 index 0000000..99fd0d9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.LiteMQPullConsumerImpl; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; + +public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements LiteMQPullConsumer { + private LiteMQPullConsumerImpl liteMQPullConsumer; + + public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) { + this.liteMQPullConsumer = new LiteMQPullConsumerImpl(this, rpcHook); + } + + @Override public void subscribe(String topic, String subExpression) throws MQClientException{ + this.liteMQPullConsumer.subscribe(topic, subExpression); + } + + @Override public void unsubscribe(String topic) { + } + + @Override public List<MessageExt> poll() { + return poll(this.getConsumerPullTimeoutMillis()); + } + + @Override public List<MessageExt> poll(long timeout) { + return liteMQPullConsumer.poll(timeout); + } + + @Override public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + + } + + @Override public void pause(Collection<MessageQueue> messageQueueCollection) { + + } + + @Override public void resume(Collection<MessageQueue> partitions) { + + } + + @Override public void commitSync() { + + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f3b6caa..dbf37d2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.client.consumer; +import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java new file mode 100644 index 0000000..223cca0 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LiteMQPullConsumer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public interface LiteMQPullConsumer { + /** + * Subscribe some topic + * + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if + * null or * expression,meaning subscribe all + */ + void subscribe(final String topic, final String subExpression) throws MQClientException; + + /** + * Unsubscribe consumption some topic + * + * @param topic message topic + */ + void unsubscribe(final String topic); + + /** + * @return + */ + List<MessageExt> poll(); + + List<MessageExt> poll(long timeout); + + void seek(MessageQueue messageQueue, long offset) throws MQClientException; + + void pause(Collection<MessageQueue> messageQueueCollection); + + void resume(Collection<MessageQueue> partitions); + + void commitSync(); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 28b807c..9c7cb36 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.client.consumer; +import java.util.Collection; +import java.util.List; import java.util.Set; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -169,4 +171,5 @@ public interface MQPullConsumer extends MQConsumer { */ void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java index 7332818..abf5f47 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java @@ -17,16 +17,21 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.PullResult; @@ -34,6 +39,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -118,6 +124,81 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); } + public List<MessageExt> poll(long timeout) { + try { + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + List<MessageExt> messages = consumeRequest.getMessageExts(); + for (MessageExt messageExt : messages) { + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + } + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + return messages; + } + } catch (InterruptedException ignore) { + } + return null; + } + + public void pause(Collection<MessageQueue> messageQueues) { + assignedMessageQueue.pause(messageQueues); + } + + public void resume(Collection<MessageQueue> messageQueues) { + assignedMessageQueue.resume(messageQueues); + } + + public void unsubscribe(final String topic) { + unsubscribe(topic); + removePullTaskCallback(topic); + assignedMessageQueue.removeAssignedMessageQueue(topic); + } + + public void removePullTaskCallback(final String topic) { + removePullTask(topic); + } + + public void removePullTask(final String topic) { + synchronized (this.taskTable) { + Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<MessageQueue, PullTaskImpl> next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + it.remove(); + } + } + } + } + + public void commit() { + List<ConsumeRequest> consumeRequests; + synchronized (this.allConsumed) { + consumeRequests = this.allConsumed; + this.allConsumed = new ArrayList<ConsumeRequest>(); + } + for (ConsumeRequest consumeRequest : consumeRequests) { + consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts); + } + Set<Map.Entry<MessageQueue, Long>> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet(); + for (Map.Entry<MessageQueue, Long> entry : entrySet) { + try { + updateConsumeOffset(entry.getKey(), entry.getValue()); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet()); + } + + private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { + long offset = processQueue.removeMessage(Collections.singletonList(messageExt)); + try { + updateConsumeOffset(messageQueue, offset); + } catch (MQClientException e) { + log.error("An error occurred in update consume offset process.", e); + } + } + public void subscribe(String topic, String subExpression) throws MQClientException { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(), @@ -159,7 +240,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { try { offset = assignedMessageQueue.getNextOffset(remoteQueue); if (offset == -1) { - offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false); + offset = fetchConsumeOffset(remoteQueue, false); assignedMessageQueue.updateNextOffset(remoteQueue, offset); } } catch (MQClientException e) { @@ -168,6 +249,63 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { return offset; } + private void cleanExpireMsg() { + for (final Map.Entry<MessageQueue, ProcessQueue> next : rebalanceImpl.getProcessQueueTable().entrySet()) { + ProcessQueue pq = next.getValue(); + MessageQueue mq = next.getKey(); + ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); + if (lockTreeMap == null) { + log.error("Gets tree map lock in process queue error of message queue:", mq); + return; + } + + TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap(); + + int loop = msgTreeMap.size(); + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty()) { + msg = msgTreeMap.firstEntry().getValue(); + if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) + > 10 * 60 * 1000) { + //Expired, ack and remove it. + } else { + break; + } + } else { + break; + } + } finally { + lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("Gets expired message exception", e); + } + + try { + this.defaultMQPullConsumer.sendMessageBack(msg, 3); + log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + System.out.println("Send expired msg back."); + commit(mq, pq, msg); + } catch (Exception e) { + log.error("Send back expired msg exception", e); + } + } + } + } + + private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) { + try { + return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true); + } catch (IllegalAccessException e) { + return null; + } + } + public class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false;
