qqy-might opened a new issue #2858:
URL: https://github.com/apache/rocketmq/issues/2858


   
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage 
没有遵循编码规约
   1、方法过长,需要按层次组织
   2、代码层级过深,形成“大于号”风格
   
   
   public void pullMessage(final PullRequest pullRequest) {
           final ProcessQueue processQueue = pullRequest.getProcessQueue();
           if (processQueue.isDropped()) {
               log.info("the pull request[{}] is dropped.", 
pullRequest.toString());
               return;
           }
   
           
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
   
           try {
               this.makeSureStateOK();
           } catch (MQClientException e) {
               log.warn("pullMessage exception, consumer state not ok", e);
               this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
               return;
           }
   
           if (this.isPause()) {
               log.warn("consumer was paused, execute pull request later. 
instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), 
this.defaultMQPushConsumer.getConsumerGroup());
               this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
               return;
           }
   
           long cachedMessageCount = processQueue.getMsgCount().get();
           long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 
(1024 * 1024);
   
           if (cachedMessageCount > 
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
               this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
               if ((queueFlowControlTimes++ % 1000) == 0) {
                   log.warn(
                       "the cached message count exceeds the threshold {}, so 
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
                       this.defaultMQPushConsumer.getPullThresholdForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
               }
               return;
           }
   
           if (cachedMessageSizeInMiB > 
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
               this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
               if ((queueFlowControlTimes++ % 1000) == 0) {
                   log.warn(
                       "the cached message size exceeds the threshold {} MiB, 
so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
                       
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
               }
               return;
           }
   
           if (!this.consumeOrderly) {
               if (processQueue.getMaxSpan() > 
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                   this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                   if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                       log.warn(
                           "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, 
flowControlTimes={}",
                           processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                           pullRequest, queueMaxSpanFlowControlTimes);
                   }
                   return;
               }
           } else {
               if (processQueue.isLocked()) {
                   if (!pullRequest.isLockedFirst()) {
                       final long offset = 
this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                       boolean brokerBusy = offset < 
pullRequest.getNextOffset();
                       log.info("the first time to pull message, so fix offset 
from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                           pullRequest, offset, brokerBusy);
                       if (brokerBusy) {
                           log.info("[NOTIFYME]the first time to pull message, 
but pull request offset larger than broker consume offset. pullRequest: {} 
NewOffset: {}",
                               pullRequest, offset);
                       }
   
                       pullRequest.setLockedFirst(true);
                       pullRequest.setNextOffset(offset);
                   }
               } else {
                   this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
                   log.info("pull message later because not locked in broker, 
{}", pullRequest);
                   return;
               }
           }
   
           final SubscriptionData subscriptionData = 
this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
           if (null == subscriptionData) {
               this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
               log.warn("find the consumer's subscription failed, {}", 
pullRequest);
               return;
           }
   
           final long beginTimestamp = System.currentTimeMillis();
   
           PullCallback pullCallback = new PullCallback() {
               @Override
               public void onSuccess(PullResult pullResult) {
                   if (pullResult != null) {
                       pullResult = 
DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(),
 pullResult,
                           subscriptionData);
   
                       switch (pullResult.getPullStatus()) {
                           case FOUND:
                               long prevRequestOffset = 
pullRequest.getNextOffset();
                               
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                               long pullRT = System.currentTimeMillis() - 
beginTimestamp;
                               
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                   pullRequest.getMessageQueue().getTopic(), 
pullRT);
   
                               long firstMsgOffset = Long.MAX_VALUE;
                               if (pullResult.getMsgFoundList() == null || 
pullResult.getMsgFoundList().isEmpty()) {
                                   
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               } else {
                                   firstMsgOffset = 
pullResult.getMsgFoundList().get(0).getQueueOffset();
   
                                   
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                       
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
   
                                   boolean dispatchToConsume = 
processQueue.putMessage(pullResult.getMsgFoundList());
                                   
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                       pullResult.getMsgFoundList(),
                                       processQueue,
                                       pullRequest.getMessageQueue(),
                                       dispatchToConsume);
   
                                   if 
(DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                       
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                           
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                   } else {
                                       
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                   }
                               }
   
                               if (pullResult.getNextBeginOffset() < 
prevRequestOffset
                                   || firstMsgOffset < prevRequestOffset) {
                                   log.warn(
                                       "[BUG] pull message result maybe data 
wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                       pullResult.getNextBeginOffset(),
                                       firstMsgOffset,
                                       prevRequestOffset);
                               }
   
                               break;
                           case NO_NEW_MSG:
                           case NO_MATCHED_MSG:
                               
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
   
                               
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
   
                               
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                               break;
                           case OFFSET_ILLEGAL:
                               log.warn("the pull request offset illegal, {} 
{}",
                                   pullRequest.toString(), 
pullResult.toString());
                               
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
   
                               pullRequest.getProcessQueue().setDropped(true);
                               
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
   
                                   @Override
                                   public void run() {
                                       try {
                                           
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                               pullRequest.getNextOffset(), 
false);
   
                                           
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
   
                                           
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
   
                                           log.warn("fix the pull request 
offset, {}", pullRequest);
                                       } catch (Throwable e) {
                                           log.error("executeTaskLater 
Exception", e);
                                       }
                                   }
                               }, 10000);
                               break;
                           default:
                               break;
                       }
                   }
               }
   
               @Override
               public void onException(Throwable e) {
                   if 
(!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
 {
                       log.warn("execute the pull request exception", e);
                   }
   
                   
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
               }
           };
   
           boolean commitOffsetEnable = false;
           long commitOffsetValue = 0L;
           if (MessageModel.CLUSTERING == 
this.defaultMQPushConsumer.getMessageModel()) {
               commitOffsetValue = 
this.offsetStore.readOffset(pullRequest.getMessageQueue(), 
ReadOffsetType.READ_FROM_MEMORY);
               if (commitOffsetValue > 0) {
                   commitOffsetEnable = true;
               }
           }
   
           String subExpression = null;
           boolean classFilter = false;
           SubscriptionData sd = 
this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
           if (sd != null) {
               if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && 
!sd.isClassFilterMode()) {
                   subExpression = sd.getSubString();
               }
   
               classFilter = sd.isClassFilterMode();
           }
   
           int sysFlag = PullSysFlag.buildSysFlag(
               commitOffsetEnable, // commitOffset
               true, // suspend
               subExpression != null, // subscription
               classFilter // class filter
           );
           try {
               this.pullAPIWrapper.pullKernelImpl(
                   pullRequest.getMessageQueue(),
                   subExpression,
                   subscriptionData.getExpressionType(),
                   subscriptionData.getSubVersion(),
                   pullRequest.getNextOffset(),
                   this.defaultMQPushConsumer.getPullBatchSize(),
                   sysFlag,
                   commitOffsetValue,
                   BROKER_SUSPEND_MAX_TIME_MILLIS,
                   CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                   CommunicationMode.ASYNC,
                   pullCallback
               );
           } catch (Exception e) {
               log.error("pullKernelImpl exception", e);
               this.executePullRequestLater(pullRequest, 
pullTimeDelayMillsWhenException);
           }
       }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to