http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java new file mode 100644 index 0000000..efc5ab0 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.common.message.MessageQueue; + + +/** + * @author shijia.wxr + */ +public class PullRequest { + private String consumerGroup; + private MessageQueue messageQueue; + private ProcessQueue processQueue; + private long nextOffset; + private boolean lockedFirst = false; + + public boolean isLockedFirst() { + return lockedFirst; + } + + public void setLockedFirst(boolean lockedFirst) { + this.lockedFirst = lockedFirst; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + + public long getNextOffset() { + return nextOffset; + } + + + public void setNextOffset(long nextOffset) { + this.nextOffset = nextOffset; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); + result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PullRequest other = (PullRequest) obj; + if (consumerGroup == null) { + if (other.consumerGroup != null) + return false; + } else if (!consumerGroup.equals(other.consumerGroup)) + return false; + if (messageQueue == null) { + if (other.messageQueue != null) + return false; + } else if (!messageQueue.equals(other.messageQueue)) + return false; + return true; + } + + @Override + public String toString() { + return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue + + ", nextOffset=" + nextOffset + "]"; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + + public void setProcessQueue(ProcessQueue processQueue) { + this.processQueue = processQueue; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java new file mode 100644 index 0000000..e140b6a --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.PullStatus; +import com.alibaba.rocketmq.common.message.MessageExt; + +import java.util.List; + + +/** + * @author shijia.wxr + */ +public class PullResultExt extends PullResult { + private final long suggestWhichBrokerId; + private byte[] messageBinary; + + + public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, + List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { + super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); + this.suggestWhichBrokerId = suggestWhichBrokerId; + this.messageBinary = messageBinary; + } + + + public byte[] getMessageBinary() { + return messageBinary; + } + + + public void setMessageBinary(byte[] messageBinary) { + this.messageBinary = messageBinary; + } + + + public long getSuggestWhichBrokerId() { + return suggestWhichBrokerId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java new file mode 100644 index 0000000..641bb75 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -0,0 +1,481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import com.alibaba.rocketmq.client.impl.FindBrokerResult; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody; +import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.slf4j.Logger; + +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * Base class for rebalance algorithm + * + * @author shijia.wxr + */ +public abstract class RebalanceImpl { + protected static final Logger log = ClientLogger.getLog(); + protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); + protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = + new ConcurrentHashMap<String, Set<MessageQueue>>(); + protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = + new ConcurrentHashMap<String, SubscriptionData>(); + protected String consumerGroup; + protected MessageModel messageModel; + protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; + protected MQClientInstance mQClientFactory; + + + public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MQClientInstance mQClientFactory) { + this.consumerGroup = consumerGroup; + this.messageModel = messageModel; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + this.mQClientFactory = mQClientFactory; + } + + public void unlock(final MessageQueue mq, final boolean oneway) { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + if (findBrokerResult != null) { + UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); + requestBody.setConsumerGroup(this.consumerGroup); + requestBody.setClientId(this.mQClientFactory.getClientId()); + requestBody.getMqSet().add(mq); + + try { + this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); + log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", // + this.consumerGroup, // + this.mQClientFactory.getClientId(), // + mq); + } catch (Exception e) { + log.error("unlockBatchMQ exception, " + mq, e); + } + } + } + + public void unlockAll(final boolean oneway) { + HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); + + for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) { + final String brokerName = entry.getKey(); + final Set<MessageQueue> mqs = entry.getValue(); + + if (mqs.isEmpty()) + continue; + + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + if (findBrokerResult != null) { + UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); + requestBody.setConsumerGroup(this.consumerGroup); + requestBody.setClientId(this.mQClientFactory.getClientId()); + requestBody.setMqSet(mqs); + + try { + this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); + + for (MessageQueue mq : mqs) { + ProcessQueue processQueue = this.processQueueTable.get(mq); + if (processQueue != null) { + processQueue.setLocked(false); + log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq); + } + } + } catch (Exception e) { + log.error("unlockBatchMQ exception, " + mqs, e); + } + } + } + } + + private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() { + HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>(); + for (MessageQueue mq : this.processQueueTable.keySet()) { + Set<MessageQueue> mqs = result.get(mq.getBrokerName()); + if (null == mqs) { + mqs = new HashSet<MessageQueue>(); + result.put(mq.getBrokerName(), mqs); + } + + mqs.add(mq); + } + + return result; + } + + public boolean lock(final MessageQueue mq) { + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + if (findBrokerResult != null) { + LockBatchRequestBody requestBody = new LockBatchRequestBody(); + requestBody.setConsumerGroup(this.consumerGroup); + requestBody.setClientId(this.mQClientFactory.getClientId()); + requestBody.getMqSet().add(mq); + + try { + Set<MessageQueue> lockedMq = + this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); + for (MessageQueue mmqq : lockedMq) { + ProcessQueue processQueue = this.processQueueTable.get(mmqq); + if (processQueue != null) { + processQueue.setLocked(true); + processQueue.setLastLockTimestamp(System.currentTimeMillis()); + } + } + + boolean lockOK = lockedMq.contains(mq); + log.info("the message queue lock {}, {} {}", + lockOK ? "OK" : "Failed", + this.consumerGroup, + mq); + return lockOK; + } catch (Exception e) { + log.error("lockBatchMQ exception, " + mq, e); + } + } + + return false; + } + + public void lockAll() { + HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); + + Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, Set<MessageQueue>> entry = it.next(); + final String brokerName = entry.getKey(); + final Set<MessageQueue> mqs = entry.getValue(); + + if (mqs.isEmpty()) + continue; + + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + if (findBrokerResult != null) { + LockBatchRequestBody requestBody = new LockBatchRequestBody(); + requestBody.setConsumerGroup(this.consumerGroup); + requestBody.setClientId(this.mQClientFactory.getClientId()); + requestBody.setMqSet(mqs); + + try { + Set<MessageQueue> lockOKMQSet = + this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); + + for (MessageQueue mq : lockOKMQSet) { + ProcessQueue processQueue = this.processQueueTable.get(mq); + if (processQueue != null) { + if (!processQueue.isLocked()) { + log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); + } + + processQueue.setLocked(true); + processQueue.setLastLockTimestamp(System.currentTimeMillis()); + } + } + for (MessageQueue mq : mqs) { + if (!lockOKMQSet.contains(mq)) { + ProcessQueue processQueue = this.processQueueTable.get(mq); + if (processQueue != null) { + processQueue.setLocked(false); + log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq); + } + } + } + } catch (Exception e) { + log.error("lockBatchMQ exception, " + mqs, e); + } + } + } + } + + public void doRebalance(final boolean isOrder) { + Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { + final String topic = entry.getKey(); + try { + this.rebalanceByTopic(topic, isOrder); + } catch (Throwable e) { + if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + log.warn("rebalanceByTopic Exception", e); + } + } + } + } + + this.truncateMessageQueueNotMyTopic(); + } + + public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { + return subscriptionInner; + } + + private void rebalanceByTopic(final String topic, final boolean isOrder) { + switch (messageModel) { + case BROADCASTING: { + Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); + if (mqSet != null) { + boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); + if (changed) { + this.messageQueueChanged(topic, mqSet, mqSet); + log.info("messageQueueChanged {} {} {} {}", // + consumerGroup, // + topic, // + mqSet, // + mqSet); + } + } else { + log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); + } + break; + } + case CLUSTERING: { + Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); + List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); + if (null == mqSet) { + if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); + } + } + + if (null == cidAll) { + log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); + } + + if (mqSet != null && cidAll != null) { + List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); + mqAll.addAll(mqSet); + + Collections.sort(mqAll); + Collections.sort(cidAll); + + AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; + + List<MessageQueue> allocateResult = null; + try { + allocateResult = strategy.allocate(// + this.consumerGroup, // + this.mQClientFactory.getClientId(), // + mqAll, // + cidAll); + } catch (Throwable e) { + log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), + e); + return; + } + + Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); + if (allocateResult != null) { + allocateResultSet.addAll(allocateResult); + } + + boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); + if (changed) { + log.info( + "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", + strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), + allocateResultSet.size(), allocateResultSet); + this.messageQueueChanged(topic, mqSet, allocateResultSet); + } + } + break; + } + default: + break; + } + } + + private void truncateMessageQueueNotMyTopic() { + Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); + + for (MessageQueue mq : this.processQueueTable.keySet()) { + if (!subTable.containsKey(mq.getTopic())) { + + ProcessQueue pq = this.processQueueTable.remove(mq); + if (pq != null) { + pq.setDropped(true); + log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); + } + } + } + } + + private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { + boolean changed = false; + + Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueue> next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueue pq = next.getValue(); + + if (mq.getTopic().equals(topic)) { + if (!mqSet.contains(mq)) { + pq.setDropped(true); + if (this.removeUnnecessaryMessageQueue(mq, pq)) { + it.remove(); + changed = true; + log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); + } + } else if (pq.isPullExpired()) { + switch (this.consumeType()) { + case CONSUME_ACTIVELY: + break; + case CONSUME_PASSIVELY: + pq.setDropped(true); + if (this.removeUnnecessaryMessageQueue(mq, pq)) { + it.remove(); + changed = true; + log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", + consumerGroup, mq); + } + break; + default: + break; + } + } + } + } + + List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); + for (MessageQueue mq : mqSet) { + if (!this.processQueueTable.containsKey(mq)) { + if (isOrder && !this.lock(mq)) { + log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); + continue; + } + + this.removeDirtyOffset(mq); + ProcessQueue pq = new ProcessQueue(); + long nextOffset = this.computePullFromWhere(mq); + if (nextOffset >= 0) { + ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); + if (pre != null) { + log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); + } else { + log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); + PullRequest pullRequest = new PullRequest(); + pullRequest.setConsumerGroup(consumerGroup); + pullRequest.setNextOffset(nextOffset); + pullRequest.setMessageQueue(mq); + pullRequest.setProcessQueue(pq); + pullRequestList.add(pullRequest); + changed = true; + } + } else { + log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); + } + } + } + + this.dispatchPullRequest(pullRequestList); + + return changed; + } + + public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); + + public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq); + + public abstract ConsumeType consumeType(); + + public abstract void removeDirtyOffset(final MessageQueue mq); + + public abstract long computePullFromWhere(final MessageQueue mq); + + public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList); + + public void removeProcessQueue(final MessageQueue mq) { + ProcessQueue prev = this.processQueueTable.remove(mq); + if (prev != null) { + boolean droped = prev.isDropped(); + prev.setDropped(true); + this.removeUnnecessaryMessageQueue(mq, prev); + log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped); + } + } + + public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() { + return processQueueTable; + } + + + public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { + return topicSubscribeInfoTable; + } + + + public String getConsumerGroup() { + return consumerGroup; + } + + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + + public MessageModel getMessageModel() { + return messageModel; + } + + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + + public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + + public MQClientInstance getmQClientFactory() { + return mQClientFactory; + } + + + public void setmQClientFactory(MQClientInstance mQClientFactory) { + this.mQClientFactory = mQClientFactory; + } + + + public void destroy() { + Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueue> next = it.next(); + next.getValue().setDropped(true); + } + + this.processQueueTable.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java new file mode 100644 index 0000000..8d2b465 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import com.alibaba.rocketmq.client.consumer.MessageQueueListener; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.util.List; +import java.util.Set; + + +/** + * @author shijia.wxr + */ +public class RebalancePullImpl extends RebalanceImpl { + private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; + + + public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { + this(null, null, null, null, defaultMQPullConsumerImpl); + } + + + public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { + super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); + this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl; + } + + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener(); + if (messageQueueListener != null) { + try { + messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); + } catch (Throwable e) { + log.error("messageQueueChanged exception", e); + } + } + } + + @Override + public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { + this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); + this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); + return true; + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_ACTIVELY; + } + + @Override + public void removeDirtyOffset(final MessageQueue mq) { + this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); + } + + @Override + public long computePullFromWhere(MessageQueue mq) { + return 0; + } + + @Override + public void dispatchPullRequest(List<PullRequest> pullRequestList) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java new file mode 100644 index 0000000..2377d29 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import com.alibaba.rocketmq.client.consumer.store.OffsetStore; +import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.common.MixAll; +import com.alibaba.rocketmq.common.UtilAll; +import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +/** + * @author shijia.wxr + */ +public class RebalancePushImpl extends RebalanceImpl { + private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); + private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + + + public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + this(null, null, null, null, defaultMQPushConsumerImpl); + } + + + public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, + MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); + this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; + } + + @Override + public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { + } + + @Override + public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { + this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); + this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); + if (this.defaultMQPushConsumerImpl.isConsumeOrderly() + && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { + try { + if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { + try { + return this.unlockDelay(mq, pq); + } finally { + pq.getLockConsume().unlock(); + } + } else { + log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // + mq, // + pq.getTryUnlockTimes()); + + pq.incTryUnlockTimes(); + } + } catch (Exception e) { + log.error("removeUnnecessaryMessageQueue Exception", e); + } + + return false; + } + return true; + } + + private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) { + + if (pq.hasTempMessage()) { + log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq); + this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() { + @Override + public void run() { + log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq); + RebalancePushImpl.this.unlock(mq, true); + } + }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); + } else { + this.unlock(mq, true); + } + return true; + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_PASSIVELY; + } + + @Override + public void removeDirtyOffset(final MessageQueue mq) { + this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); + } + + @Override + public long computePullFromWhere(MessageQueue mq) { + long result = -1; + final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); + final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); + switch (consumeFromWhere) { + case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: + case CONSUME_FROM_MIN_OFFSET: + case CONSUME_FROM_MAX_OFFSET: + case CONSUME_FROM_LAST_OFFSET: { + long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } + // First start,no offset + else if (-1 == lastOffset) { + if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + result = 0L; + } else { + try { + result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } catch (MQClientException e) { + result = -1; + } + } + } else { + result = -1; + } + break; + } + case CONSUME_FROM_FIRST_OFFSET: { + long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } else if (-1 == lastOffset) { + result = 0L; + } else { + result = -1; + } + break; + } + case CONSUME_FROM_TIMESTAMP: { + long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); + if (lastOffset >= 0) { + result = lastOffset; + } else if (-1 == lastOffset) { + if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + try { + result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); + } catch (MQClientException e) { + result = -1; + } + } else { + try { + long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), + UtilAll.YYYY_MMDD_HHMMSS).getTime(); + result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } catch (MQClientException e) { + result = -1; + } + } + } else { + result = -1; + } + break; + } + + default: + break; + } + + return result; + } + + @Override + public void dispatchPullRequest(List<PullRequest> pullRequestList) { + for (PullRequest pullRequest : pullRequestList) { + this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); + log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java ---------------------------------------------------------------------- diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java new file mode 100644 index 0000000..47a9da5 --- /dev/null +++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.rocketmq.client.impl.consumer; + +import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; +import com.alibaba.rocketmq.client.log.ClientLogger; +import com.alibaba.rocketmq.common.ServiceThread; +import org.slf4j.Logger; + + +/** + * Rebalance Service + * + * @author shijia.wxr + */ +public class RebalanceService extends ServiceThread { + private static long waitInterval = + Long.parseLong(System.getProperty( + "rocketmq.client.rebalance.waitInterval", "20000")); + private final Logger log = ClientLogger.getLog(); + private final MQClientInstance mqClientFactory; + + public RebalanceService(MQClientInstance mqClientFactory) { + this.mqClientFactory = mqClientFactory; + } + + @Override + public void run() { + log.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + this.waitForRunning(waitInterval); + this.mqClientFactory.doRebalance(); + } + + log.info(this.getServiceName() + " service end"); + } + + + @Override + public String getServiceName() { + return RebalanceService.class.getSimpleName(); + } +}
