http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java deleted file mode 100644 index efc5ab0..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.common.message.MessageQueue; - - -/** - * @author shijia.wxr - */ -public class PullRequest { - private String consumerGroup; - private MessageQueue messageQueue; - private ProcessQueue processQueue; - private long nextOffset; - private boolean lockedFirst = false; - - public boolean isLockedFirst() { - return lockedFirst; - } - - public void setLockedFirst(boolean lockedFirst) { - this.lockedFirst = lockedFirst; - } - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - - public void setMessageQueue(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - - - public long getNextOffset() { - return nextOffset; - } - - - public void setNextOffset(long nextOffset) { - this.nextOffset = nextOffset; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); - result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - PullRequest other = (PullRequest) obj; - if (consumerGroup == null) { - if (other.consumerGroup != null) - return false; - } else if (!consumerGroup.equals(other.consumerGroup)) - return false; - if (messageQueue == null) { - if (other.messageQueue != null) - return false; - } else if (!messageQueue.equals(other.messageQueue)) - return false; - return true; - } - - @Override - public String toString() { - return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue - + ", nextOffset=" + nextOffset + "]"; - } - - public ProcessQueue getProcessQueue() { - return processQueue; - } - - - public void setProcessQueue(ProcessQueue processQueue) { - this.processQueue = processQueue; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java deleted file mode 100644 index e140b6a..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.PullResult; -import com.alibaba.rocketmq.client.consumer.PullStatus; -import com.alibaba.rocketmq.common.message.MessageExt; - -import java.util.List; - - -/** - * @author shijia.wxr - */ -public class PullResultExt extends PullResult { - private final long suggestWhichBrokerId; - private byte[] messageBinary; - - - public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, - List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { - super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); - this.suggestWhichBrokerId = suggestWhichBrokerId; - this.messageBinary = messageBinary; - } - - - public byte[] getMessageBinary() { - return messageBinary; - } - - - public void setMessageBinary(byte[] messageBinary) { - this.messageBinary = messageBinary; - } - - - public long getSuggestWhichBrokerId() { - return suggestWhichBrokerId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java deleted file mode 100644 index 641bb75..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java +++ /dev/null @@ -1,481 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.impl.FindBrokerResult; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; -import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData; -import org.slf4j.Logger; - -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * Base class for rebalance algorithm - * - * @author shijia.wxr - */ -public abstract class RebalanceImpl { - protected static final Logger log = ClientLogger.getLog(); - protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); - protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = - new ConcurrentHashMap<String, Set<MessageQueue>>(); - protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = - new ConcurrentHashMap<String, SubscriptionData>(); - protected String consumerGroup; - protected MessageModel messageModel; - protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; - protected MQClientInstance mQClientFactory; - - - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory) { - this.consumerGroup = consumerGroup; - this.messageModel = messageModel; - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - this.mQClientFactory = mQClientFactory; - } - - public void unlock(final MessageQueue mq, final boolean oneway) { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); - if (findBrokerResult != null) { - UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); - requestBody.setConsumerGroup(this.consumerGroup); - requestBody.setClientId(this.mQClientFactory.getClientId()); - requestBody.getMqSet().add(mq); - - try { - this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); - log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", // - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mq); - } catch (Exception e) { - log.error("unlockBatchMQ exception, " + mq, e); - } - } - } - - public void unlockAll(final boolean oneway) { - HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); - - for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) { - final String brokerName = entry.getKey(); - final Set<MessageQueue> mqs = entry.getValue(); - - if (mqs.isEmpty()) - continue; - - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); - if (findBrokerResult != null) { - UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); - requestBody.setConsumerGroup(this.consumerGroup); - requestBody.setClientId(this.mQClientFactory.getClientId()); - requestBody.setMqSet(mqs); - - try { - this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); - - for (MessageQueue mq : mqs) { - ProcessQueue processQueue = this.processQueueTable.get(mq); - if (processQueue != null) { - processQueue.setLocked(false); - log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq); - } - } - } catch (Exception e) { - log.error("unlockBatchMQ exception, " + mqs, e); - } - } - } - } - - private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() { - HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>(); - for (MessageQueue mq : this.processQueueTable.keySet()) { - Set<MessageQueue> mqs = result.get(mq.getBrokerName()); - if (null == mqs) { - mqs = new HashSet<MessageQueue>(); - result.put(mq.getBrokerName(), mqs); - } - - mqs.add(mq); - } - - return result; - } - - public boolean lock(final MessageQueue mq) { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); - if (findBrokerResult != null) { - LockBatchRequestBody requestBody = new LockBatchRequestBody(); - requestBody.setConsumerGroup(this.consumerGroup); - requestBody.setClientId(this.mQClientFactory.getClientId()); - requestBody.getMqSet().add(mq); - - try { - Set<MessageQueue> lockedMq = - this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); - for (MessageQueue mmqq : lockedMq) { - ProcessQueue processQueue = this.processQueueTable.get(mmqq); - if (processQueue != null) { - processQueue.setLocked(true); - processQueue.setLastLockTimestamp(System.currentTimeMillis()); - } - } - - boolean lockOK = lockedMq.contains(mq); - log.info("the message queue lock {}, {} {}", - lockOK ? "OK" : "Failed", - this.consumerGroup, - mq); - return lockOK; - } catch (Exception e) { - log.error("lockBatchMQ exception, " + mq, e); - } - } - - return false; - } - - public void lockAll() { - HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName(); - - Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, Set<MessageQueue>> entry = it.next(); - final String brokerName = entry.getKey(); - final Set<MessageQueue> mqs = entry.getValue(); - - if (mqs.isEmpty()) - continue; - - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); - if (findBrokerResult != null) { - LockBatchRequestBody requestBody = new LockBatchRequestBody(); - requestBody.setConsumerGroup(this.consumerGroup); - requestBody.setClientId(this.mQClientFactory.getClientId()); - requestBody.setMqSet(mqs); - - try { - Set<MessageQueue> lockOKMQSet = - this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); - - for (MessageQueue mq : lockOKMQSet) { - ProcessQueue processQueue = this.processQueueTable.get(mq); - if (processQueue != null) { - if (!processQueue.isLocked()) { - log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq); - } - - processQueue.setLocked(true); - processQueue.setLastLockTimestamp(System.currentTimeMillis()); - } - } - for (MessageQueue mq : mqs) { - if (!lockOKMQSet.contains(mq)) { - ProcessQueue processQueue = this.processQueueTable.get(mq); - if (processQueue != null) { - processQueue.setLocked(false); - log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq); - } - } - } - } catch (Exception e) { - log.error("lockBatchMQ exception, " + mqs, e); - } - } - } - } - - public void doRebalance(final boolean isOrder) { - Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); - if (subTable != null) { - for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { - final String topic = entry.getKey(); - try { - this.rebalanceByTopic(topic, isOrder); - } catch (Throwable e) { - if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - log.warn("rebalanceByTopic Exception", e); - } - } - } - } - - this.truncateMessageQueueNotMyTopic(); - } - - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { - return subscriptionInner; - } - - private void rebalanceByTopic(final String topic, final boolean isOrder) { - switch (messageModel) { - case BROADCASTING: { - Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); - if (mqSet != null) { - boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); - if (changed) { - this.messageQueueChanged(topic, mqSet, mqSet); - log.info("messageQueueChanged {} {} {} {}", // - consumerGroup, // - topic, // - mqSet, // - mqSet); - } - } else { - log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); - } - break; - } - case CLUSTERING: { - Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); - List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); - if (null == mqSet) { - if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); - } - } - - if (null == cidAll) { - log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); - } - - if (mqSet != null && cidAll != null) { - List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); - mqAll.addAll(mqSet); - - Collections.sort(mqAll); - Collections.sort(cidAll); - - AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; - - List<MessageQueue> allocateResult = null; - try { - allocateResult = strategy.allocate(// - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mqAll, // - cidAll); - } catch (Throwable e) { - log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), - e); - return; - } - - Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); - if (allocateResult != null) { - allocateResultSet.addAll(allocateResult); - } - - boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); - if (changed) { - log.info( - "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", - strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), - allocateResultSet.size(), allocateResultSet); - this.messageQueueChanged(topic, mqSet, allocateResultSet); - } - } - break; - } - default: - break; - } - } - - private void truncateMessageQueueNotMyTopic() { - Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); - - for (MessageQueue mq : this.processQueueTable.keySet()) { - if (!subTable.containsKey(mq.getTopic())) { - - ProcessQueue pq = this.processQueueTable.remove(mq); - if (pq != null) { - pq.setDropped(true); - log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq); - } - } - } - } - - private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { - boolean changed = false; - - Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueue> next = it.next(); - MessageQueue mq = next.getKey(); - ProcessQueue pq = next.getValue(); - - if (mq.getTopic().equals(topic)) { - if (!mqSet.contains(mq)) { - pq.setDropped(true); - if (this.removeUnnecessaryMessageQueue(mq, pq)) { - it.remove(); - changed = true; - log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); - } - } else if (pq.isPullExpired()) { - switch (this.consumeType()) { - case CONSUME_ACTIVELY: - break; - case CONSUME_PASSIVELY: - pq.setDropped(true); - if (this.removeUnnecessaryMessageQueue(mq, pq)) { - it.remove(); - changed = true; - log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", - consumerGroup, mq); - } - break; - default: - break; - } - } - } - } - - List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); - for (MessageQueue mq : mqSet) { - if (!this.processQueueTable.containsKey(mq)) { - if (isOrder && !this.lock(mq)) { - log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); - continue; - } - - this.removeDirtyOffset(mq); - ProcessQueue pq = new ProcessQueue(); - long nextOffset = this.computePullFromWhere(mq); - if (nextOffset >= 0) { - ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); - if (pre != null) { - log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); - } else { - log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); - PullRequest pullRequest = new PullRequest(); - pullRequest.setConsumerGroup(consumerGroup); - pullRequest.setNextOffset(nextOffset); - pullRequest.setMessageQueue(mq); - pullRequest.setProcessQueue(pq); - pullRequestList.add(pullRequest); - changed = true; - } - } else { - log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); - } - } - } - - this.dispatchPullRequest(pullRequestList); - - return changed; - } - - public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided); - - public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq); - - public abstract ConsumeType consumeType(); - - public abstract void removeDirtyOffset(final MessageQueue mq); - - public abstract long computePullFromWhere(final MessageQueue mq); - - public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList); - - public void removeProcessQueue(final MessageQueue mq) { - ProcessQueue prev = this.processQueueTable.remove(mq); - if (prev != null) { - boolean droped = prev.isDropped(); - prev.setDropped(true); - this.removeUnnecessaryMessageQueue(mq, prev); - log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped); - } - } - - public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() { - return processQueueTable; - } - - - public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { - return topicSubscribeInfoTable; - } - - - public String getConsumerGroup() { - return consumerGroup; - } - - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - - public MessageModel getMessageModel() { - return messageModel; - } - - - public void setMessageModel(MessageModel messageModel) { - this.messageModel = messageModel; - } - - - public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { - return allocateMessageQueueStrategy; - } - - - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { - this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; - } - - - public MQClientInstance getmQClientFactory() { - return mQClientFactory; - } - - - public void setmQClientFactory(MQClientInstance mQClientFactory) { - this.mQClientFactory = mQClientFactory; - } - - - public void destroy() { - Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<MessageQueue, ProcessQueue> next = it.next(); - next.getValue().setDropped(true); - } - - this.processQueueTable.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java deleted file mode 100644 index 8d2b465..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.consumer.MessageQueueListener; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; - -import java.util.List; -import java.util.Set; - - -/** - * @author shijia.wxr - */ -public class RebalancePullImpl extends RebalanceImpl { - private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; - - - public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { - this(null, null, null, null, defaultMQPullConsumerImpl); - } - - - public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { - super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); - this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl; - } - - @Override - public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener(); - if (messageQueueListener != null) { - try { - messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); - } catch (Throwable e) { - log.error("messageQueueChanged exception", e); - } - } - } - - @Override - public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { - this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); - this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); - return true; - } - - @Override - public ConsumeType consumeType() { - return ConsumeType.CONSUME_ACTIVELY; - } - - @Override - public void removeDirtyOffset(final MessageQueue mq) { - this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); - } - - @Override - public long computePullFromWhere(MessageQueue mq) { - return 0; - } - - @Override - public void dispatchPullRequest(List<PullRequest> pullRequestList) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java deleted file mode 100644 index 2377d29..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import com.alibaba.rocketmq.client.consumer.store.OffsetStore; -import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; -import com.alibaba.rocketmq.client.exception.MQClientException; -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.common.MixAll; -import com.alibaba.rocketmq.common.UtilAll; -import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; -import com.alibaba.rocketmq.common.message.MessageQueue; -import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType; -import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - - -/** - * @author shijia.wxr - */ -public class RebalancePushImpl extends RebalanceImpl { - private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); - private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - - - public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { - this(null, null, null, null, defaultMQPushConsumerImpl); - } - - - public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { - super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); - this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; - } - - @Override - public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { - } - - @Override - public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { - this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); - this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); - if (this.defaultMQPushConsumerImpl.isConsumeOrderly() - && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { - try { - if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { - try { - return this.unlockDelay(mq, pq); - } finally { - pq.getLockConsume().unlock(); - } - } else { - log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // - mq, // - pq.getTryUnlockTimes()); - - pq.incTryUnlockTimes(); - } - } catch (Exception e) { - log.error("removeUnnecessaryMessageQueue Exception", e); - } - - return false; - } - return true; - } - - private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) { - - if (pq.hasTempMessage()) { - log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq); - this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() { - @Override - public void run() { - log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq); - RebalancePushImpl.this.unlock(mq, true); - } - }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS); - } else { - this.unlock(mq, true); - } - return true; - } - - @Override - public ConsumeType consumeType() { - return ConsumeType.CONSUME_PASSIVELY; - } - - @Override - public void removeDirtyOffset(final MessageQueue mq) { - this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); - } - - @Override - public long computePullFromWhere(MessageQueue mq) { - long result = -1; - final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); - final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); - switch (consumeFromWhere) { - case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: - case CONSUME_FROM_MIN_OFFSET: - case CONSUME_FROM_MAX_OFFSET: - case CONSUME_FROM_LAST_OFFSET: { - long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); - if (lastOffset >= 0) { - result = lastOffset; - } - // First start,no offset - else if (-1 == lastOffset) { - if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - result = 0L; - } else { - try { - result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } catch (MQClientException e) { - result = -1; - } - } - } else { - result = -1; - } - break; - } - case CONSUME_FROM_FIRST_OFFSET: { - long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); - if (lastOffset >= 0) { - result = lastOffset; - } else if (-1 == lastOffset) { - result = 0L; - } else { - result = -1; - } - break; - } - case CONSUME_FROM_TIMESTAMP: { - long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); - if (lastOffset >= 0) { - result = lastOffset; - } else if (-1 == lastOffset) { - if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { - try { - result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); - } catch (MQClientException e) { - result = -1; - } - } else { - try { - long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), - UtilAll.YYYY_MMDD_HHMMSS).getTime(); - result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); - } catch (MQClientException e) { - result = -1; - } - } - } else { - result = -1; - } - break; - } - - default: - break; - } - - return result; - } - - @Override - public void dispatchPullRequest(List<PullRequest> pullRequestList) { - for (PullRequest pullRequest : pullRequestList) { - this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); - log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java deleted file mode 100644 index 47a9da5..0000000 --- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.rocketmq.client.impl.consumer; - -import com.alibaba.rocketmq.client.impl.factory.MQClientInstance; -import com.alibaba.rocketmq.client.log.ClientLogger; -import com.alibaba.rocketmq.common.ServiceThread; -import org.slf4j.Logger; - - -/** - * Rebalance Service - * - * @author shijia.wxr - */ -public class RebalanceService extends ServiceThread { - private static long waitInterval = - Long.parseLong(System.getProperty( - "rocketmq.client.rebalance.waitInterval", "20000")); - private final Logger log = ClientLogger.getLog(); - private final MQClientInstance mqClientFactory; - - public RebalanceService(MQClientInstance mqClientFactory) { - this.mqClientFactory = mqClientFactory; - } - - @Override - public void run() { - log.info(this.getServiceName() + " service started"); - - while (!this.isStopped()) { - this.waitForRunning(waitInterval); - this.mqClientFactory.doRebalance(); - } - - log.info(this.getServiceName() + " service end"); - } - - - @Override - public String getServiceName() { - return RebalanceService.class.getSimpleName(); - } -}