This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 713bb98  Refactor the protection logic when pulling
     new b7b9f1e  Merge branch 'develop' of github.com:apache/rocketmq into 
develop
713bb98 is described below

commit 713bb9883b687cc18de388d92fe0c9cfd830cc04
Author: vongosling <[email protected]>
AuthorDate: Wed Jul 31 11:41:40 2019 +0800

    Refactor the protection logic when pulling
---
 .../impl/consumer/DefaultMQPullConsumerImpl.java   | 36 +++++++++++-----------
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 8aff14b..d484298 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -94,13 +94,13 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     }
 
     public void createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag) throws MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, 
queueNum, topicSysFlag);
     }
 
-    private void makeSureStateOK() throws MQClientException {
+    private void isRunning() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
-            throw new MQClientException("The consumer service state not OK, "
+            throw new MQClientException("The consumer is not in running 
status, "
                 + this.serviceState
                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                 null);
@@ -108,12 +108,12 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     }
 
     public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.offsetStore.readOffset(mq, fromStore ? 
ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
     }
 
     public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         if (null == topic) {
             throw new IllegalArgumentException("topic is null");
         }
@@ -130,12 +130,12 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     }
 
     public List<MessageQueue> fetchPublishMessageQueues(String topic) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return 
this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
     }
 
     public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         // check if has info in memory, otherwise invoke api.
         Set<MessageQueue> result = 
this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
         if (null == result) {
@@ -156,17 +156,17 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     }
 
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException 
{
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
     }
 
     public long maxOffset(MessageQueue mq) throws MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
     }
 
     public long minOffset(MessageQueue mq) throws MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
     }
 
@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData 
subscriptionData, long offset, int maxNums, boolean block,
         long timeout)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
-        this.makeSureStateOK();
+        this.isRunning();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
@@ -383,7 +383,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     @Override
     public void persistConsumerOffset() {
         try {
-            this.makeSureStateOK();
+            this.isRunning();
             Set<MessageQueue> mqs = new HashSet<MessageQueue>();
             Set<MessageQueue> allocateMq = 
this.rebalanceImpl.getProcessQueueTable().keySet();
             mqs.addAll(allocateMq);
@@ -466,7 +466,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         final PullCallback pullCallback,
         final boolean block,
         final long timeout) throws MQClientException, RemotingException, 
InterruptedException {
-        this.makeSureStateOK();
+        this.isRunning();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
@@ -543,18 +543,18 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
 
     public QueryResult queryMessage(String topic, String key, int maxNum, long 
begin, long end)
         throws MQClientException, InterruptedException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, 
maxNum, begin, end);
     }
 
     public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
         throws MQClientException, InterruptedException {
-        this.makeSureStateOK();
+        this.isRunning();
         return 
this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
     }
 
     public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, 
timestamp);
     }
 
@@ -748,13 +748,13 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     }
 
     public void updateConsumeOffset(MessageQueue mq, long offset) throws 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         this.offsetStore.updateOffset(mq, offset, false);
     }
 
     public MessageExt viewMessage(String msgId)
         throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        this.makeSureStateOK();
+        this.isRunning();
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
 

Reply via email to