http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 4241c0e..c22c515 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -16,6 +16,17 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -44,7 +55,11 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; @@ -60,11 +75,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - public class DefaultMQPushConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur @@ -98,7 +108,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long flowControlTimes1 = 0; private long flowControlTimes2 = 0; - public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this.defaultMQPushConsumer = defaultMQPushConsumer; this.rpcHook = rpcHook; @@ -214,8 +223,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((flowControlTimes1++ % 1000) == 0) { log.warn( - "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); + "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1); } return; } @@ -225,9 +234,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((flowControlTimes2++ % 1000) == 0) { log.warn( - "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", - processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), - pullRequest, flowControlTimes2); + "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), + pullRequest, flowControlTimes2); } return; } @@ -237,10 +246,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", - pullRequest, offset, brokerBusy); + pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", - pullRequest, offset); + pullRequest, offset); } pullRequest.setLockedFirst(true); @@ -267,7 +276,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, - subscriptionData); + subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: @@ -275,7 +284,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), pullRT); + pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { @@ -284,30 +293,30 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), - pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); + pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// - pullResult.getMsgFoundList(), // - processQueue, // - pullRequest.getMessageQueue(), // - dispathToConsume); + pullResult.getMsgFoundList(), // + processQueue, // + pullRequest.getMessageQueue(), // + dispathToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, - DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); + DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset// - || firstMsgOffset < prevRequestOffset) { + || firstMsgOffset < prevRequestOffset) { log.warn( - "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // - pullResult.getNextBeginOffset(), // - firstMsgOffset, // - prevRequestOffset); + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // + pullResult.getNextBeginOffset(), // + firstMsgOffset, // + prevRequestOffset); } break; @@ -327,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", // - pullRequest.toString(), pullResult.toString()); + pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); @@ -337,7 +346,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), - pullRequest.getNextOffset(), false); + pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); @@ -356,7 +365,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { @@ -388,24 +396,24 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } int sysFlag = PullSysFlag.buildSysFlag(// - commitOffsetEnable, // commitOffset - true, // suspend - subExpression != null, // subscription - classFilter // class filter + commitOffsetEnable, // commitOffset + true, // suspend + subExpression != null, // subscription + classFilter // class filter ); try { this.pullAPIWrapper.pullKernelImpl(// - pullRequest.getMessageQueue(), // 1 - subExpression, // 2 - subscriptionData.getSubVersion(), // 3 - pullRequest.getNextOffset(), // 4 - this.defaultMQPushConsumer.getPullBatchSize(), // 5 - sysFlag, // 6 - commitOffsetValue, // 7 - BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 - CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 - CommunicationMode.ASYNC, // 10 - pullCallback// 11 + pullRequest.getMessageQueue(), // 1 + subExpression, // 2 + subscriptionData.getSubVersion(), // 3 + pullRequest.getNextOffset(), // 4 + this.defaultMQPushConsumer.getPullBatchSize(), // 5 + sysFlag, // 6 + commitOffsetValue, // 7 + BROKER_SUSPEND_MAX_TIME_MILLIS, // 8 + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9 + CommunicationMode.ASYNC, // 10 + pullCallback// 11 ); } catch (Exception e) { log.error("pullKernelImpl exception", e); @@ -416,9 +424,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); } } @@ -453,16 +461,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException { return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, - InterruptedException { + InterruptedException { return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey); } - public void registerMessageListener(MessageListener messageListener) { this.messageListenerInner = messageListener; } @@ -474,12 +481,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) - : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); + : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, - this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); + this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e); @@ -532,7 +539,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), - this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); + this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); @@ -551,8 +558,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( - mQClientFactory, - this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); + mQClientFactory, + this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { @@ -574,11 +581,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = - new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); + new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = - new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); + new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner()); } this.consumeMessageService.start(); @@ -588,8 +595,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() - + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), - null); + + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); } mQClientFactory.start(); @@ -600,9 +607,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// - + this.serviceState// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), - null); + + this.serviceState// + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); default: break; } @@ -619,133 +626,133 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { if (null == this.defaultMQPushConsumer.getConsumerGroup()) { throw new MQClientException( - "consumerGroup is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumerGroup is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { throw new MQClientException( - "consumerGroup can not equal " - + MixAll.DEFAULT_CONSUMER_GROUP - + ", please specify another one." - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } if (null == this.defaultMQPushConsumer.getMessageModel()) { throw new MQClientException( - "messageModel is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) { throw new MQClientException( - "consumeFromWhere is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeFromWhere is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS); if (null == dt) { throw new MQClientException( - "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // allocateMessageQueueStrategy if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) { throw new MQClientException( - "allocateMessageQueueStrategy is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // subscription if (null == this.defaultMQPushConsumer.getSubscription()) { throw new MQClientException( - "subscription is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "subscription is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // messageListener if (null == this.defaultMQPushConsumer.getMessageListener()) { throw new MQClientException( - "messageListener is null" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "messageListener is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly; boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently; if (!orderly && !concurrently) { throw new MQClientException( - "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // consumeThreadMin if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 - || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 - || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { + || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000 + || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) { throw new MQClientException( - "consumeThreadMin Out of range [1, 1000]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeThreadMin Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // consumeThreadMax if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) { throw new MQClientException( - "consumeThreadMax Out of range [1, 1000]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeThreadMax Out of range [1, 1000]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // consumeConcurrentlyMaxSpan if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 - || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { + || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) { throw new MQClientException( - "consumeConcurrentlyMaxSpan Out of range [1, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeConcurrentlyMaxSpan Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // pullThresholdForQueue if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) { throw new MQClientException( - "pullThresholdForQueue Out of range [1, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "pullThresholdForQueue Out of range [1, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // pullInterval if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) { throw new MQClientException( - "pullInterval Out of range [0, 65535]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "pullInterval Out of range [0, 65535]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // consumeMessageBatchMaxSize if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 - || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { + || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) { throw new MQClientException( - "consumeMessageBatchMaxSize Out of range [1, 1024]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "consumeMessageBatchMaxSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } // pullBatchSize if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) { throw new MQClientException( - "pullBatchSize Out of range [1, 1024]" - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), - null); + "pullBatchSize Out of range [1, 1024]" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); } } @@ -757,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, subString); + topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } @@ -772,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - retryTopic, SubscriptionData.SUB_ALL); + retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: @@ -804,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String subExpression) throws MQClientException { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, subExpression); + topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); @@ -817,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { try { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // - topic, "*"); + topic, "*"); subscriptionData.setSubString(fullClassName); subscriptionData.setClassFilterMode(true); subscriptionData.setFilterClassSource(filterClassSource); @@ -865,7 +872,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public void resetOffsetByTimeStamp(long timeStamp) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) { Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic); Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); @@ -1017,9 +1024,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long computeAccTotal = this.computeAccumulationTotal(); long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold(); - long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0); + long incThreshold = (long)(adjustThreadPoolNumsThreshold * 1.0); - long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8); + long decThreshold = (long)(adjustThreadPoolNumsThreshold * 0.8); if (computeAccTotal >= incThreshold) { this.consumeMessageService.incCorePoolSize(); @@ -1044,7 +1051,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic) - throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + throws RemotingException, MQClientException, InterruptedException, MQBrokerException { List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>(); TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000); for (BrokerData brokerData : routeData.getBrokerDatas()) { @@ -1055,12 +1062,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { return queueTimeSpan; } - public ConsumeMessageService getConsumeMessageService() { return consumeMessageService; } - public void setConsumeMessageService(ConsumeMessageService consumeMessageService) { this.consumeMessageService = consumeMessageService;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java index b1a2a25..ce4f2b9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java @@ -6,16 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.Set; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; @@ -23,9 +24,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; -import java.util.Set; - - /** * Consumer inner interface * @@ -33,33 +31,23 @@ import java.util.Set; public interface MQConsumerInner { String groupName(); - MessageModel messageModel(); - ConsumeType consumeType(); - ConsumeFromWhere consumeFromWhere(); - Set<SubscriptionData> subscriptions(); - void doRebalance(); - void persistConsumerOffset(); - void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info); - boolean isSubscribeTopicNeedUpdate(final String topic); - boolean isUnitMode(); - ConsumerRunningInfo consumerRunningInfo(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java index 1e573c3..47ae2b0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java @@ -6,20 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; -import org.apache.rocketmq.common.message.MessageQueue; - import java.util.concurrent.ConcurrentHashMap; - +import org.apache.rocketmq.common.message.MessageQueue; /** * Message lock,strictly ensure the single queue only one thread at a time consuming @@ -27,8 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class MessageQueueLock { private ConcurrentHashMap<MessageQueue, Object> mqLockTable = - new ConcurrentHashMap<MessageQueue, Object>(); - + new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject(final MessageQueue mq) { Object objLock = this.mqLockTable.get(mq); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index f361f1f..2d17703 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -6,24 +6,16 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.common.message.MessageAccessor; -import org.apache.rocketmq.common.message.MessageConst; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; -import org.slf4j.Logger; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -33,7 +25,13 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; +import org.slf4j.Logger; /** * Queue consumption snapshot @@ -41,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ProcessQueue { public final static long REBALANCE_LOCK_MAX_LIVE_TIME = - Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000")); + Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000")); public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000")); private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000")); private final Logger log = ClientLogger.getLog(); @@ -65,7 +63,6 @@ public class ProcessQueue { return result; } - public boolean isPullExpired() { boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; return result; @@ -80,7 +77,7 @@ public class ProcessQueue { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } - + int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; @@ -126,7 +123,6 @@ public class ProcessQueue { } } - public boolean putMessage(final List<MessageExt> msgs) { boolean dispatchToConsume = false; try { @@ -167,7 +163,6 @@ public class ProcessQueue { return dispatchToConsume; } - public long getMaxSpan() { try { this.lockTreeMap.readLock().lockInterruptibly(); @@ -185,7 +180,6 @@ public class ProcessQueue { return 0; } - public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); @@ -218,22 +212,18 @@ public class ProcessQueue { return result; } - public TreeMap<Long, MessageExt> getMsgTreeMap() { return msgTreeMap; } - public AtomicLong getMsgCount() { return msgCount; } - public boolean isDropped() { return dropped; } - public void setDropped(boolean dropped) { this.dropped = dropped; } @@ -260,7 +250,6 @@ public class ProcessQueue { } } - public long commit() { try { this.lockTreeMap.writeLock().lockInterruptibly(); @@ -281,7 +270,6 @@ public class ProcessQueue { return -1; } - public void makeMessageToCosumeAgain(List<MessageExt> msgs) { try { this.lockTreeMap.writeLock().lockInterruptibly(); @@ -298,7 +286,6 @@ public class ProcessQueue { } } - public List<MessageExt> takeMessags(final int batchSize) { List<MessageExt> result = new ArrayList<MessageExt>(batchSize); final long now = System.currentTimeMillis(); @@ -331,7 +318,6 @@ public class ProcessQueue { return result; } - public boolean hasTempMessage() { try { this.lockTreeMap.readLock().lockInterruptibly(); @@ -346,7 +332,6 @@ public class ProcessQueue { return true; } - public void clear() { try { this.lockTreeMap.writeLock().lockInterruptibly(); @@ -363,52 +348,42 @@ public class ProcessQueue { } } - public long getLastLockTimestamp() { return lastLockTimestamp; } - public void setLastLockTimestamp(long lastLockTimestamp) { this.lastLockTimestamp = lastLockTimestamp; } - public Lock getLockConsume() { return lockConsume; } - public long getLastPullTimestamp() { return lastPullTimestamp; } - public void setLastPullTimestamp(long lastPullTimestamp) { this.lastPullTimestamp = lastPullTimestamp; } - public long getMsgAccCnt() { return msgAccCnt; } - public void setMsgAccCnt(long msgAccCnt) { this.msgAccCnt = msgAccCnt; } - public long getTryUnlockTimes() { return this.tryUnlockTimes.get(); } - public void incTryUnlockTimes() { this.tryUnlockTimes.incrementAndGet(); } - public void fillProcessQueueInfo(final ProcessQueueInfo info) { try { this.lockTreeMap.readLock().lockInterruptibly(); @@ -438,12 +413,10 @@ public class ProcessQueue { } } - public long getLastConsumeTimestamp() { return lastConsumeTimestamp; } - public void setLastConsumeTimestamp(long lastConsumeTimestamp) { this.lastConsumeTimestamp = lastConsumeTimestamp; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 59c9b1c..d358175 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -16,6 +16,12 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; @@ -28,7 +34,11 @@ import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; @@ -36,21 +46,13 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.remoting.exception.RemotingException; import org.slf4j.Logger; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - - public class PullAPIWrapper { private final Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = - new ConcurrentHashMap<MessageQueue, AtomicLong>(32); + new ConcurrentHashMap<MessageQueue, AtomicLong>(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; private Random random = new Random(System.currentTimeMillis()); @@ -63,8 +65,8 @@ public class PullAPIWrapper { } public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, - final SubscriptionData subscriptionData) { - PullResultExt pullResultExt = (PullResultExt) pullResult; + final SubscriptionData subscriptionData) { + PullResultExt pullResultExt = (PullResultExt)pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { @@ -92,9 +94,9 @@ public class PullAPIWrapper { for (MessageExt msg : msgListFilterAgain) { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, - Long.toString(pullResult.getMinOffset())); + Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, - Long.toString(pullResult.getMaxOffset())); + Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); @@ -131,26 +133,26 @@ public class PullAPIWrapper { } public PullResult pullKernelImpl( - final MessageQueue mq, - final String subExpression, - final long subVersion, - final long offset, - final int maxNums, - final int sysFlag, - final long commitOffset, - final long brokerSuspendMaxTimeMillis, - final long timeoutMillis, - final CommunicationMode communicationMode, - final PullCallback pullCallback + final MessageQueue mq, + final String subExpression, + final long subVersion, + final long offset, + final int maxNums, + final int sysFlag, + final long commitOffset, + final long brokerSuspendMaxTimeMillis, + final long timeoutMillis, + final CommunicationMode communicationMode, + final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { FindBrokerResult findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), - this.recalculatePullFromWhichNode(mq), false); + this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), - this.recalculatePullFromWhichNode(mq), false); + this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { @@ -178,11 +180,11 @@ public class PullAPIWrapper { } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( - brokerAddr, - requestHeader, - timeoutMillis, - communicationMode, - pullCallback); + brokerAddr, + requestHeader, + timeoutMillis, + communicationMode, + pullCallback); return pullResult; } @@ -204,7 +206,7 @@ public class PullAPIWrapper { } private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) - throws MQClientException { + throws MQClientException { ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) { TopicRouteData topicRouteData = topicRouteTable.get(topic); @@ -216,13 +218,18 @@ public class PullAPIWrapper { } throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: " - + topic, null); + + topic, null); } public boolean isConnectBrokerByUser() { return connectBrokerByUser; } + public void setConnectBrokerByUser(boolean connectBrokerByUser) { + this.connectBrokerByUser = connectBrokerByUser; + + } + public int randomNum() { int value = random.nextInt(); if (value < 0) { @@ -233,11 +240,6 @@ public class PullAPIWrapper { return value; } - public void setConnectBrokerByUser(boolean connectBrokerByUser) { - this.connectBrokerByUser = connectBrokerByUser; - - } - public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) { this.filterMessageHookList = filterMessageHookList; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java index 4634c24..55e3d59 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java @@ -6,35 +6,37 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ServiceThread; import org.slf4j.Logger; -import java.util.concurrent.*; - - public class PullMessageService extends ServiceThread { private final Logger log = ClientLogger.getLog(); private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); private final MQClientInstance mQClientFactory; private final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "PullMessageServiceScheduledThread"); - } - }); + .newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "PullMessageServiceScheduledThread"); + } + }); public PullMessageService(MQClientInstance mQClientFactory) { this.mQClientFactory = mQClientFactory; @@ -69,14 +71,13 @@ public class PullMessageService extends ServiceThread { private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { - DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; + DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } } - @Override public void run() { log.info(this.getServiceName() + " service started"); @@ -96,11 +97,9 @@ public class PullMessageService extends ServiceThread { log.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return PullMessageService.class.getSimpleName(); } - } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java index ccc624b..4850313 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; import org.apache.rocketmq.common.message.MessageQueue; - public class PullRequest { private String consumerGroup; private MessageQueue messageQueue; @@ -38,27 +37,22 @@ public class PullRequest { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public MessageQueue getMessageQueue() { return messageQueue; } - public void setMessageQueue(MessageQueue messageQueue) { this.messageQueue = messageQueue; } - public long getNextOffset() { return nextOffset; } - public void setNextOffset(long nextOffset) { this.nextOffset = nextOffset; } @@ -80,7 +74,7 @@ public class PullRequest { return false; if (getClass() != obj.getClass()) return false; - PullRequest other = (PullRequest) obj; + PullRequest other = (PullRequest)obj; if (consumerGroup == null) { if (other.consumerGroup != null) return false; @@ -97,14 +91,13 @@ public class PullRequest { @Override public String toString() { return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue - + ", nextOffset=" + nextOffset + "]"; + + ", nextOffset=" + nextOffset + "]"; } public ProcessQueue getProcessQueue() { return processQueue; } - public void setProcessQueue(ProcessQueue processQueue) { this.processQueue = processQueue; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java index d248603..c43c9a9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java @@ -6,46 +6,40 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.List; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - - public class PullResultExt extends PullResult { private final long suggestWhichBrokerId; private byte[] messageBinary; - public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset, - List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { + List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) { super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList); this.suggestWhichBrokerId = suggestWhichBrokerId; this.messageBinary = messageBinary; } - public byte[] getMessageBinary() { return messageBinary; } - public void setMessageBinary(byte[] messageBinary) { this.messageBinary = messageBinary; } - public long getSuggestWhichBrokerId() { return suggestWhichBrokerId; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 53d775f..91bfd1a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -16,6 +16,16 @@ */ package org.apache.rocketmq.client.impl.consumer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -29,30 +39,23 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.slf4j.Logger; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; - - /** * Base class for rebalance algorithm - * */ public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = - new ConcurrentHashMap<String, Set<MessageQueue>>(); + new ConcurrentHashMap<String, Set<MessageQueue>>(); protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = - new ConcurrentHashMap<String, SubscriptionData>(); + new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; protected MessageModel messageModel; protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; protected MQClientInstance mQClientFactory; - public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory) { + MQClientInstance mQClientFactory) { this.consumerGroup = consumerGroup; this.messageModel = messageModel; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; @@ -70,9 +73,9 @@ public abstract class RebalanceImpl { try { this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", // - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mq); + this.consumerGroup, // + this.mQClientFactory.getClientId(), // + mq); } catch (Exception e) { log.error("unlockBatchMQ exception, " + mq, e); } @@ -138,7 +141,7 @@ public abstract class RebalanceImpl { try { Set<MessageQueue> lockedMq = - this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); + this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mmqq : lockedMq) { ProcessQueue processQueue = this.processQueueTable.get(mmqq); if (processQueue != null) { @@ -149,9 +152,9 @@ public abstract class RebalanceImpl { boolean lockOK = lockedMq.contains(mq); log.info("the message queue lock {}, {} {}", - lockOK ? "OK" : "Failed", - this.consumerGroup, - mq); + lockOK ? "OK" : "Failed", + this.consumerGroup, + mq); return lockOK; } catch (Exception e) { log.error("lockBatchMQ exception, " + mq, e); @@ -182,7 +185,7 @@ public abstract class RebalanceImpl { try { Set<MessageQueue> lockOKMQSet = - this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); + this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000); for (MessageQueue mq : lockOKMQSet) { ProcessQueue processQueue = this.processQueueTable.get(mq); @@ -242,10 +245,10 @@ public abstract class RebalanceImpl { if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // - consumerGroup, // - topic, // - mqSet, // - mqSet); + consumerGroup, // + topic, // + mqSet, // + mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); @@ -277,13 +280,13 @@ public abstract class RebalanceImpl { List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate(// - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mqAll, // - cidAll); + this.consumerGroup, // + this.mQClientFactory.getClientId(), // + mqAll, // + cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), - e); + e); return; } @@ -295,9 +298,9 @@ public abstract class RebalanceImpl { boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( - "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", - strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), - allocateResultSet.size(), allocateResultSet); + "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", + strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), + allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } @@ -350,7 +353,7 @@ public abstract class RebalanceImpl { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", - consumerGroup, mq); + consumerGroup, mq); } break; default: @@ -422,52 +425,42 @@ public abstract class RebalanceImpl { return processQueueTable; } - public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { return topicSubscribeInfoTable; } - public String getConsumerGroup() { return consumerGroup; } - public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } - public MessageModel getMessageModel() { return messageModel; } - public void setMessageModel(MessageModel messageModel) { this.messageModel = messageModel; } - public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { return allocateMessageQueueStrategy; } - public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; } - public MQClientInstance getmQClientFactory() { return mQClientFactory; } - public void setmQClientFactory(MQClientInstance mQClientFactory) { this.mQClientFactory = mQClientFactory; } - public void destroy() { Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java index d37090d..1130943 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java @@ -6,16 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -23,21 +25,15 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; -import java.util.Set; - - public class RebalancePullImpl extends RebalanceImpl { private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; - public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { this(null, null, null, null, defaultMQPullConsumerImpl); } - public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { + MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 4d0d47f..707b9a1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -6,16 +6,19 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; @@ -28,23 +31,16 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - - public class RebalancePushImpl extends RebalanceImpl { private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000")); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; - public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { this(null, null, null, null, defaultMQPushConsumerImpl); } - public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, - MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { + MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) { super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory); this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; } @@ -58,7 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl { this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); if (this.defaultMQPushConsumerImpl.isConsumeOrderly() - && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { + && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) { try { @@ -68,8 +64,8 @@ public class RebalancePushImpl extends RebalanceImpl { } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // - mq, // - pq.getTryUnlockTimes()); + mq, // + pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } @@ -164,7 +160,7 @@ public class RebalancePushImpl extends RebalanceImpl { } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), - UtilAll.YYYY_MMDD_HHMMSS).getTime(); + UtilAll.YYYY_MMDD_HHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java index 5b5ab2a..985129e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.impl.consumer; @@ -21,15 +21,14 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ServiceThread; import org.slf4j.Logger; - /** * Rebalance Service * */ public class RebalanceService extends ServiceThread { private static long waitInterval = - Long.parseLong(System.getProperty( - "rocketmq.client.rebalance.waitInterval", "20000")); + Long.parseLong(System.getProperty( + "rocketmq.client.rebalance.waitInterval", "20000")); private final Logger log = ClientLogger.getLog(); private final MQClientInstance mqClientFactory; @@ -49,7 +48,6 @@ public class RebalanceService extends ServiceThread { log.info(this.getServiceName() + " service end"); } - @Override public String getServiceName() { return RebalanceService.class.getSimpleName();