http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java new file mode 100644 index 0000000..c4e91a3 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.HashSet; +import java.util.Set; + + +/** + * Default pulling consumer + * + * @author shijia.wxr + */ +public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { + protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; + + /** + * Do the same thing for the same Group, the application must be set,and + * guarantee Globally unique + */ + private String consumerGroup; + /** + * Long polling mode, the Consumer connection max suspend time, it is not + * recommended to modify + */ + private long brokerSuspendMaxTimeMillis = 1000 * 20; + /** + * Long polling mode, the Consumer connection timeout(must greater than + * brokerSuspendMaxTimeMillis), it is not recommended to modify + */ + private long consumerTimeoutMillisWhenSuspend = 1000 * 30; + /** + * The socket timeout in milliseconds + */ + private long consumerPullTimeoutMillis = 1000 * 10; + /** + * Consumption pattern,default is clustering + */ + private MessageModel messageModel = MessageModel.CLUSTERING; + /** + * Message queue listener + */ + private MessageQueueListener messageQueueListener; + /** + * Offset Storage + */ + private OffsetStore offsetStore; + /** + * Topic set you want to register + */ + private Set<String> registerTopics = new HashSet<String>(); + /** + * Queue allocation algorithm + */ + private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + /** + * Whether the unit of subscription group + */ + private boolean unitMode = false; + + private int maxReconsumeTimes = 16; + + + public DefaultMQPullConsumer() { + this(MixAll.DEFAULT_CONSUMER_GROUP, null); + } + + + public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) { + this.consumerGroup = consumerGroup; + defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook); + } + + + public DefaultMQPullConsumer(final String consumerGroup) { + this(consumerGroup, null); + } + + + public DefaultMQPullConsumer(RPCHook rpcHook) { + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp); + } + + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPullConsumerImpl.maxOffset(mq); + } + + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPullConsumerImpl.minOffset(mq); + } + + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq); + } + + + @Override + public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException { + return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId); + } + + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end); + } + + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + + public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + + public long getBrokerSuspendMaxTimeMillis() { + return brokerSuspendMaxTimeMillis; + } + + + public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) { + this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; + } + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public long getConsumerPullTimeoutMillis() { + return consumerPullTimeoutMillis; + } + + + public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) { + this.consumerPullTimeoutMillis = consumerPullTimeoutMillis; + } + + + public long getConsumerTimeoutMillisWhenSuspend() { + return consumerTimeoutMillisWhenSuspend; + } + + + public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { + this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } + + + public Set<String> getRegisterTopics() { + return registerTopics; + } + + + public void setRegisterTopics(Set<String> registerTopics) { + this.registerTopics = registerTopics; + } + + + @Override + public void sendMessageBack(MessageExt msg, int delayLevel) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null); + } + + + @Override + public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); + } + + @Override + public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { + return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic); + } + + @Override + public void start() throws MQClientException { + this.defaultMQPullConsumerImpl.start(); + } + + @Override + public void shutdown() { + this.defaultMQPullConsumerImpl.shutdown(); + } + + @Override + public void registerMessageQueueListener(String topic, MessageQueueListener listener) { + synchronized (this.registerTopics) { + this.registerTopics.add(topic); + if (listener != null) { + this.messageQueueListener = listener; + } + } + } + + @Override + public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums); + } + + @Override + public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); + } + + @Override + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback); + } + + @Override + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); + } + + @Override + public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums); + } + + @Override + public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback); + } + + @Override + public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException { + this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset); + } + + @Override + public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException { + return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore); + } + + @Override + public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException { + return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic); + } + + @Override + public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageDecoder.decodeMessageId(uniqKey); + return this.viewMessage(uniqKey); + } catch (Exception e) { + } + return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey); + } + + @Override + public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + + public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() { + return defaultMQPullConsumerImpl; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + + public int getMaxReconsumeTimes() { + return maxReconsumeTimes; + } + + + public void setMaxReconsumeTimes(final int maxReconsumeTimes) { + this.maxReconsumeTimes = maxReconsumeTimes; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java new file mode 100644 index 0000000..cbed53b --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + + +/** + * Wrapped push consumer.in fact,it works as remarkable as the pull consumer + * + * @author shijia.wxr + */ +public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { + protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + /** + * Do the same thing for the same Group, the application must be set,and + * guarantee Globally unique + */ + private String consumerGroup; + /** + * Consumption pattern,default is clustering + */ + private MessageModel messageModel = MessageModel.CLUSTERING; + /** + * Consumption offset + */ + private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + /** + * Backtracking consumption time with second precision.time format is + * 20131223171201<br> + * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br> + * Default backtracking consumption time Half an hour ago + */ + private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + /** + * Queue allocation algorithm + */ + private AllocateMessageQueueStrategy allocateMessageQueueStrategy; + + /** + * Subscription relationship + */ + private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); + /** + * Message listener + */ + private MessageListener messageListener; + /** + * Offset Storage + */ + private OffsetStore offsetStore; + /** + * Minimum consumer thread number + */ + private int consumeThreadMin = 20; + /** + * Max consumer thread number + */ + private int consumeThreadMax = 64; + + /** + * Threshold for dynamic adjustment of the number of thread pool + */ + private long adjustThreadPoolNumsThreshold = 100000; + + /** + * Concurrently max span offset.it has no effect on sequential consumption + */ + private int consumeConcurrentlyMaxSpan = 2000; + /** + * Flow control threshold + */ + private int pullThresholdForQueue = 1000; + /** + * Message pull Interval + */ + private long pullInterval = 0; + /** + * Batch consumption size + */ + private int consumeMessageBatchMaxSize = 1; + /** + * Batch pull size + */ + private int pullBatchSize = 32; + + /** + * Whether update subscription relationship when every pull + */ + private boolean postSubscriptionWhenPull = false; + + /** + * Whether the unit of subscription group + */ + private boolean unitMode = false; + + private int maxReconsumeTimes = -1; + private long suspendCurrentQueueTimeMillis = 1000; + private long consumeTimeout = 15; + + + public DefaultMQPushConsumer() { + this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); + } + + + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + } + + + public DefaultMQPushConsumer(RPCHook rpcHook) { + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); + } + + + public DefaultMQPushConsumer(final String consumerGroup) { + this(consumerGroup, null, new AllocateMessageQueueAveragely()); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + createTopic(key, newTopic, queueNum, 0); + } + + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp); + } + + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.maxOffset(mq); + } + + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.minOffset(mq); + } + + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq); + } + + + @Override + public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId); + } + + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException { + return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + try { + MessageDecoder.decodeMessageId(msgId); + return this.viewMessage(msgId); + } catch (Exception e) { + } + return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId); + } + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + + public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + + public int getConsumeConcurrentlyMaxSpan() { + return consumeConcurrentlyMaxSpan; + } + + + public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) { + this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan; + } + + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + + public int getConsumeMessageBatchMaxSize() { + return consumeMessageBatchMaxSize; + } + + + public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { + this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; + } + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public int getConsumeThreadMax() { + return consumeThreadMax; + } + + + public void setConsumeThreadMax(int consumeThreadMax) { + this.consumeThreadMax = consumeThreadMax; + } + + + public int getConsumeThreadMin() { + return consumeThreadMin; + } + + + public void setConsumeThreadMin(int consumeThreadMin) { + this.consumeThreadMin = consumeThreadMin; + } + + + public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() { + return defaultMQPushConsumerImpl; + } + + + public MessageListener getMessageListener() { + return messageListener; + } + + + public void setMessageListener(MessageListener messageListener) { + this.messageListener = messageListener; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public int getPullBatchSize() { + return pullBatchSize; + } + + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + + public long getPullInterval() { + return pullInterval; + } + + + public void setPullInterval(long pullInterval) { + this.pullInterval = pullInterval; + } + + + public int getPullThresholdForQueue() { + return pullThresholdForQueue; + } + + + public void setPullThresholdForQueue(int pullThresholdForQueue) { + this.pullThresholdForQueue = pullThresholdForQueue; + } + + + public Map<String, String> getSubscription() { + return subscription; + } + + + public void setSubscription(Map<String, String> subscription) { + this.subscription = subscription; + } + + + @Override + public void sendMessageBack(MessageExt msg, int delayLevel) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); + } + + + @Override + public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName); + } + + + @Override + public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException { + return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); + } + + + @Override + public void start() throws MQClientException { + this.defaultMQPushConsumerImpl.start(); + } + + + @Override + public void shutdown() { + this.defaultMQPushConsumerImpl.shutdown(); + } + + + @Override + @Deprecated + public void registerMessageListener(MessageListener messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + + @Override + public void registerMessageListener(MessageListenerConcurrently messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + + @Override + public void registerMessageListener(MessageListenerOrderly messageListener) { + this.messageListener = messageListener; + this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); + } + + + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); + } + + + @Override + public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { + this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); + } + + + @Override + public void unsubscribe(String topic) { + this.defaultMQPushConsumerImpl.unsubscribe(topic); + } + + + @Override + public void updateCorePoolSize(int corePoolSize) { + this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); + } + + + @Override + public void suspend() { + this.defaultMQPushConsumerImpl.suspend(); + } + + + @Override + public void resume() { + this.defaultMQPushConsumerImpl.resume(); + } + + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + + public boolean isPostSubscriptionWhenPull() { + return postSubscriptionWhenPull; + } + + + public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) { + this.postSubscriptionWhenPull = postSubscriptionWhenPull; + } + + + public boolean isUnitMode() { + return unitMode; + } + + + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + + public long getAdjustThreadPoolNumsThreshold() { + return adjustThreadPoolNumsThreshold; + } + + + public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) { + this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; + } + + + public int getMaxReconsumeTimes() { + return maxReconsumeTimes; + } + + + public void setMaxReconsumeTimes(final int maxReconsumeTimes) { + this.maxReconsumeTimes = maxReconsumeTimes; + } + + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + + public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } + + + public long getConsumeTimeout() { + return consumeTimeout; + } + + public void setConsumeTimeout(final long consumeTimeout) { + this.consumeTimeout = consumeTimeout; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java new file mode 100644 index 0000000..3e26ed6 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.MQAdmin; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.Set; + + +/** + * Message queue consumer interface + * + * @author shijia.wxr + */ +public interface MQConsumer extends MQAdmin { + /** + * If consuming failure,message will be send back to the brokers,and delay consuming some time + * + * @param msg + * @param delayLevel + * + * @throws InterruptedException + * @throws MQBrokerException + * @throws RemotingException + * @throws MQClientException + */ + @Deprecated + void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, + MQBrokerException, InterruptedException, MQClientException; + + + /** + * If consuming failure,message will be send back to the broker,and delay consuming some time + * + * @param msg + * @param delayLevel + * @param brokerName + * + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + * @throws MQClientException + */ + void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + + + /** + * Fetch message queues from consumer cache according to the topic + * + * @param topic + * message topic + * + * @return queue set + * + * @throws MQClientException + */ + Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java new file mode 100644 index 0000000..d651562 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.Set; + + +/** + * Pulling consumer interface + * + * @author shijia.wxr + */ +public interface MQPullConsumer extends MQConsumer { + /** + * Start the consumer + * + * @throws MQClientException + */ + void start() throws MQClientException; + + + /** + * Shutdown the consumer + */ + void shutdown(); + + + /** + * Register the message queue listener + * + * @param topic + * @param listener + */ + void registerMessageQueueListener(final String topic, final MessageQueueListener listener); + + + /** + * Pulling the messages,not blocking + * + * @param mq + * from which message queue + * @param subExpression + * subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> + * if null or * expression,meaning subscribe all + * @param offset + * from where to pull + * @param maxNums + * max pulling numbers + * + * @return The resulting {@code PullRequest} + * + * @throws MQClientException + * @throws InterruptedException + * @throws MQBrokerException + * @throws RemotingException + */ + PullResult pull(final MessageQueue mq, final String subExpression, final long offset, + final int maxNums) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + + /** + * Pulling the messages in the specified timeout + * + * @param mq + * @param subExpression + * @param offset + * @param maxNums + * @param timeout + * + * @return The resulting {@code PullRequest} + * + * @throws MQClientException + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + */ + PullResult pull(final MessageQueue mq, final String subExpression, final long offset, + final int maxNums, final long timeout) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + + + /** + * Pulling the messages in a async. way + * + * @param mq + * @param subExpression + * @param offset + * @param maxNums + * @param pullCallback + * + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + /** + * Pulling the messages in a async. way + * + * @param mq + * @param subExpression + * @param offset + * @param maxNums + * @param pullCallback + * @param timeout + * + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums, + final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, + InterruptedException; + + + /** + * Pulling the messages,if no message arrival,blocking some time + * + * @param mq + * @param subExpression + * @param offset + * @param maxNums + * + * @return The resulting {@code PullRequest} + * + * @throws MQClientException + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + */ + PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression, + final long offset, final int maxNums) throws MQClientException, RemotingException, + MQBrokerException, InterruptedException; + + + /** + * Pulling the messages through callback function,if no message arrival,blocking. + * + * @param mq + * @param subExpression + * @param offset + * @param maxNums + * @param pullCallback + * + * @throws MQClientException + * @throws RemotingException + * @throws InterruptedException + */ + void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset, + final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + + /** + * Update the offset + * + * @param mq + * @param offset + * + * @throws MQClientException + */ + void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException; + + + /** + * Fetch the offset + * + * @param mq + * @param fromStore + * + * @return The fetched offset of given queue + * + * @throws MQClientException + */ + long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException; + + + /** + * Fetch the message queues according to the topic + * + * @param topic + * message topic + * + * @return message queue set + * + * @throws MQClientException + */ + Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException; + + /** + * If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br> + * Mind! message can only be consumed in the same group. + * + * @param msg + * @param delayLevel + * @param brokerName + * @param consumerGroup + * + * @throws RemotingException + * @throws MQBrokerException + * @throws InterruptedException + * @throws MQClientException + */ + void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java new file mode 100644 index 0000000..33cc1c9 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.slf4j.Logger; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * Schedule service for pull consumer + * + * @author shijia.wxr + */ +public class MQPullConsumerScheduleService { + private final Logger log = ClientLogger.getLog(); + private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); + private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable = + new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); + private DefaultMQPullConsumer defaultMQPullConsumer; + private int pullThreadNums = 20; + private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable = + new ConcurrentHashMap<String, PullTaskCallback>(); + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + public MQPullConsumerScheduleService(final String consumerGroup) { + this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup); + this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + } + + public void putTask(String topic, Set<MessageQueue> mqNewSet) { + Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, PullTaskImpl> next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + + for (MessageQueue mq : mqNewSet) { + if (!this.taskTable.containsKey(mq)) { + PullTaskImpl command = new PullTaskImpl(mq); + this.taskTable.put(mq, command); + this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS); + + } + } + } + + public void start() throws MQClientException { + final String group = this.defaultMQPullConsumer.getConsumerGroup(); + this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor( + this.pullThreadNums, + new ThreadFactoryImpl("PullMsgThread-" + group) + ); + + this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener); + + this.defaultMQPullConsumer.start(); + + log.info("MQPullConsumerScheduleService start OK, {} {}", + this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable); + } + + public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) { + this.callbackTable.put(topic, callback); + this.defaultMQPullConsumer.registerMessageQueueListener(topic, null); + } + + public void shutdown() { + if (this.scheduledThreadPoolExecutor != null) { + this.scheduledThreadPoolExecutor.shutdown(); + } + + if (this.defaultMQPullConsumer != null) { + this.defaultMQPullConsumer.shutdown(); + } + } + + public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() { + return callbackTable; + } + + public void setCallbackTable(ConcurrentHashMap<String, PullTaskCallback> callbackTable) { + this.callbackTable = callbackTable; + } + + public int getPullThreadNums() { + return pullThreadNums; + } + + public void setPullThreadNums(int pullThreadNums) { + this.pullThreadNums = pullThreadNums; + } + + public DefaultMQPullConsumer getDefaultMQPullConsumer() { + return defaultMQPullConsumer; + } + + public void setDefaultMQPullConsumer(DefaultMQPullConsumer defaultMQPullConsumer) { + this.defaultMQPullConsumer = defaultMQPullConsumer; + } + + public MessageModel getMessageModel() { + return this.defaultMQPullConsumer.getMessageModel(); + } + + public void setMessageModel(MessageModel messageModel) { + this.defaultMQPullConsumer.setMessageModel(messageModel); + } + + class MessageQueueListenerImpl implements MessageQueueListener { + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + MessageModel messageModel = + MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel(); + switch (messageModel) { + case BROADCASTING: + MQPullConsumerScheduleService.this.putTask(topic, mqAll); + break; + case CLUSTERING: + MQPullConsumerScheduleService.this.putTask(topic, mqDivided); + break; + default: + break; + } + } + } + + class PullTaskImpl implements Runnable { + private final MessageQueue messageQueue; + private volatile boolean cancelled = false; + + + public PullTaskImpl(final MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + @Override + public void run() { + String topic = this.messageQueue.getTopic(); + if (!this.isCancelled()) { + PullTaskCallback pullTaskCallback = + MQPullConsumerScheduleService.this.callbackTable.get(topic); + if (pullTaskCallback != null) { + final PullTaskContext context = new PullTaskContext(); + context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer); + try { + pullTaskCallback.doPullTask(this.messageQueue, context); + } catch (Throwable e) { + context.setPullNextDelayTimeMillis(1000); + log.error("doPullTask Exception", e); + } + + if (!this.isCancelled()) { + MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this, + context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); + } else { + log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); + } + } else { + log.warn("Pull Task Callback not exist , {}", topic); + } + } else { + log.warn("The Pull Task is cancelled, {}", messageQueue); + } + } + + + public boolean isCancelled() { + return cancelled; + } + + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java new file mode 100644 index 0000000..982c839 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; + + +/** + * Push consumer + * + * @author shijia.wxr + */ +public interface MQPushConsumer extends MQConsumer { + /** + * Start the consumer + * + * @throws MQClientException + */ + void start() throws MQClientException; + + + /** + * Shutdown the consumer + */ + void shutdown(); + + + /** + * Register the message listener + * + * @param messageListener + */ + @Deprecated + void registerMessageListener(MessageListener messageListener); + + + void registerMessageListener(final MessageListenerConcurrently messageListener); + + + void registerMessageListener(final MessageListenerOrderly messageListener); + + + /** + * Subscribe some topic + * + * @param topic + * @param subExpression + * subscription expression.it only support or operation such as + * "tag1 || tag2 || tag3" <br> + * if null or * expression,meaning subscribe all + * + * @throws MQClientException + */ + void subscribe(final String topic, final String subExpression) throws MQClientException; + + + /** + * Subscribe some topic + * + * @param topic + * @param fullClassName + * full class name,must extend + * org.apache.rocketmq.common.filter. MessageFilter + * @param filterClassSource + * class source code,used UTF-8 file encoding,must be responsible + * for your code safety + * + * @throws MQClientException + */ + void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException; + + + /** + * Unsubscribe consumption some topic + * + * @param topic + * message topic + */ + void unsubscribe(final String topic); + + + /** + * Update the consumer thread pool size Dynamically + * + * @param corePoolSize + */ + void updateCorePoolSize(int corePoolSize); + + + /** + * Suspend the consumption + */ + void suspend(); + + + /** + * Resume the consumption + */ + void resume(); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java new file mode 100644 index 0000000..e59a3ce --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.Set; + + +/** + * A MessageQueueListener is implemented by the application and may be specified when a message queue changed + * + * @author shijia.wxr + * @author vongosling + */ +public interface MessageQueueListener { + /** + * @param topic + * message topic + * @param mqAll + * all queues in this message topic + * @param mqDivided + * collection of queues,assigned to the current consumer + */ + void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, + final Set<MessageQueue> mqDivided); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java new file mode 100644 index 0000000..2429d5a --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +/** + * Async message pulling interface + * + * @author shijia.wxr + */ +public interface PullCallback { + public void onSuccess(final PullResult pullResult); + + public void onException(final Throwable e); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java new file mode 100644 index 0000000..81dd497 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class PullResult { + private final PullStatus pullStatus; + private final long nextBeginOffset; + private final long minOffset; + private final long maxOffset; + private List<MessageExt> msgFoundList; + + + public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, + List<MessageExt> msgFoundList) { + super(); + this.pullStatus = pullStatus; + this.nextBeginOffset = nextBeginOffset; + this.minOffset = minOffset; + this.maxOffset = maxOffset; + this.msgFoundList = msgFoundList; + } + + + public PullStatus getPullStatus() { + return pullStatus; + } + + + public long getNextBeginOffset() { + return nextBeginOffset; + } + + + public long getMinOffset() { + return minOffset; + } + + + public long getMaxOffset() { + return maxOffset; + } + + + public List<MessageExt> getMsgFoundList() { + return msgFoundList; + } + + + public void setMsgFoundList(List<MessageExt> msgFoundList) { + this.msgFoundList = msgFoundList; + } + + + @Override + public String toString() { + return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset + + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList=" + + (msgFoundList == null ? 0 : msgFoundList.size()) + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java new file mode 100644 index 0000000..b63a4c4 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +/** + * @author shijia.wxr + */ +public enum PullStatus { + /** + * Founded + */ + FOUND, + /** + * No new message can be pull + */ + NO_NEW_MSG, + /** + * Filtering results can not match + */ + NO_MATCHED_MSG, + /** + * Illegal offset,may be too big or too small + */ + OFFSET_ILLEGAL +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java new file mode 100644 index 0000000..f0e9b25 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.common.message.MessageQueue; + + +public interface PullTaskCallback { + public void doPullTask(final MessageQueue mq, final PullTaskContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java new file mode 100644 index 0000000..ba66a1f --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +public class PullTaskContext { + + private int pullNextDelayTimeMillis = 200; + + private MQPullConsumer pullConsumer; + + + public int getPullNextDelayTimeMillis() { + return pullNextDelayTimeMillis; + } + + + public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) { + this.pullNextDelayTimeMillis = pullNextDelayTimeMillis; + } + + + public MQPullConsumer getPullConsumer() { + return pullConsumer; + } + + + public void setPullConsumer(MQPullConsumer pullConsumer) { + this.pullConsumer = pullConsumer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java new file mode 100644 index 0000000..03223ba --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +import org.apache.rocketmq.common.message.MessageQueue; + + +/** + * Consumer concurrent consumption context + * + * @author shijia.wxr + */ +public class ConsumeConcurrentlyContext { + private final MessageQueue messageQueue; + /** + * Message consume retry strategy<br> + * -1,no retry,put into DLQ directly<br> + * 0,broker control retry frequency<br> + * >0,client control retry frequency + */ + private int delayLevelWhenNextConsume = 0; + private int ackIndex = Integer.MAX_VALUE; + + public ConsumeConcurrentlyContext(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public int getDelayLevelWhenNextConsume() { + return delayLevelWhenNextConsume; + } + + + public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public int getAckIndex() { + return ackIndex; + } + + + public void setAckIndex(int ackIndex) { + this.ackIndex = ackIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java new file mode 100644 index 0000000..433ce36 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +/** + * @author shijia.wxr + */ +public enum ConsumeConcurrentlyStatus { + /** + * Success consumption + */ + CONSUME_SUCCESS, + /** + * Failure consumption,later try to consume + */ + RECONSUME_LATER; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java new file mode 100644 index 0000000..2adeb29 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +import org.apache.rocketmq.common.message.MessageQueue; + + +/** + * Consumer Orderly consumption context + * + * @author shijia.wxr + */ +public class ConsumeOrderlyContext { + private final MessageQueue messageQueue; + private boolean autoCommit = true; + private long suspendCurrentQueueTimeMillis = -1; + + + public ConsumeOrderlyContext(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public boolean isAutoCommit() { + return autoCommit; + } + + + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public long getSuspendCurrentQueueTimeMillis() { + return suspendCurrentQueueTimeMillis; + } + + + public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java new file mode 100644 index 0000000..7da0b1f --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +/** + * @author shijia.wxr + */ +public enum ConsumeOrderlyStatus { + /** + * Success consumption + */ + SUCCESS, + /** + * Rollback consumption(only for binlog consumption) + */ + @Deprecated + ROLLBACK, + /** + * Commit offset(only for binlog consumption) + */ + @Deprecated + COMMIT, + /** + * Suspend current queue a moment + */ + SUSPEND_CURRENT_QUEUE_A_MOMENT; +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java new file mode 100644 index 0000000..82570ab --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeReturnType.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.consumer.listener; + +/** + * Created by alvin on 16-11-30. + */ +public enum ConsumeReturnType { + /** + * consume return success + */ + SUCCESS, + /** + * consume timeout ,even if success + */ + TIME_OUT, + /** + * consume throw exception + */ + EXCEPTION, + /** + * consume return null + */ + RETURNNULL, + /** + * consume return failed + */ + FAILED +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java new file mode 100644 index 0000000..adc2651 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListener.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +/** + * A MessageListener object is used to receive asynchronously delivered messages. + * + * @author shijia.wxr + */ +public interface MessageListener { +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java new file mode 100644 index 0000000..3df6cc2 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerConcurrently.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently + * + * @author shijia.wxr + */ +public interface MessageListenerConcurrently extends MessageListener { + /** + * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure + * + * @param msgs + * msgs.size() >= 1<br> + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * @param context + * + * @return The consume status + */ + ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs, + final ConsumeConcurrentlyContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java new file mode 100644 index 0000000..d1b6c79 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/MessageListenerOrderly.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.listener; + +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread + * + * @author shijia.wxr + */ +public interface MessageListenerOrderly extends MessageListener { + /** + * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure + * + * @param msgs + * msgs.size() >= 1<br> + * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here + * @param context + * + * @return The consume status + */ + ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs, + final ConsumeOrderlyContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java new file mode 100644 index 0000000..747df83 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Average Hashing queue algorithm + * + * @author manhong.yqd + */ +public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { + private final Logger log = ClientLogger.getLog(); + + @Override + public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, + List<String> cidAll) { + if (currentCID == null || currentCID.length() < 1) { + throw new IllegalArgumentException("currentCID is empty"); + } + if (mqAll == null || mqAll.isEmpty()) { + throw new IllegalArgumentException("mqAll is null or mqAll empty"); + } + if (cidAll == null || cidAll.isEmpty()) { + throw new IllegalArgumentException("cidAll is null or cidAll empty"); + } + + List<MessageQueue> result = new ArrayList<MessageQueue>(); + if (!cidAll.contains(currentCID)) { + log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", + consumerGroup, + currentCID, + cidAll); + return result; + } + + int index = cidAll.indexOf(currentCID); + int mod = mqAll.size() % cidAll.size(); + int averageSize = + mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + + 1 : mqAll.size() / cidAll.size()); + int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; + int range = Math.min(averageSize, mqAll.size() - startIndex); + for (int i = 0; i < range; i++) { + result.add(mqAll.get((startIndex + i) % mqAll.size())); + } + return result; + } + + @Override + public String getName() { + return "AVG"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java new file mode 100644 index 0000000..d6ab041 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer.rebalance; + +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Cycle average Hashing queue algorithm + * + * @author manhong.yqd + */ +public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy { + private final Logger log = ClientLogger.getLog(); + + @Override + public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, + List<String> cidAll) { + if (currentCID == null || currentCID.length() < 1) { + throw new IllegalArgumentException("currentCID is empty"); + } + if (mqAll == null || mqAll.isEmpty()) { + throw new IllegalArgumentException("mqAll is null or mqAll empty"); + } + if (cidAll == null || cidAll.isEmpty()) { + throw new IllegalArgumentException("cidAll is null or cidAll empty"); + } + + List<MessageQueue> result = new ArrayList<MessageQueue>(); + if (!cidAll.contains(currentCID)) { + log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", + consumerGroup, + currentCID, + cidAll); + return result; + } + + int index = cidAll.indexOf(currentCID); + for (int i = index; i < mqAll.size(); i++) { + if (i % cidAll.size() == index) { + result.add(mqAll.get(i)); + } + } + return result; + } + + @Override + public String getName() { + return "AVG_BY_CIRCLE"; + } +}