This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 713bb98 Refactor the protection logic when pulling
new b7b9f1e Merge branch 'develop' of github.com:apache/rocketmq into
develop
713bb98 is described below
commit 713bb9883b687cc18de388d92fe0c9cfd830cc04
Author: vongosling <[email protected]>
AuthorDate: Wed Jul 31 11:41:40 2019 +0800
Refactor the protection logic when pulling
---
.../impl/consumer/DefaultMQPullConsumerImpl.java | 36 +++++++++++-----------
1 file changed, 18 insertions(+), 18 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 8aff14b..d484298 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -94,13 +94,13 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
public void createTopic(String key, String newTopic, int queueNum, int
topicSysFlag) throws MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic,
queueNum, topicSysFlag);
}
- private void makeSureStateOK() throws MQClientException {
+ private void isRunning() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "
+ throw new MQClientException("The consumer is not in running
status, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
@@ -108,12 +108,12 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return this.offsetStore.readOffset(mq, fromStore ?
ReadOffsetType.READ_FROM_STORE : ReadOffsetType.MEMORY_FIRST_THEN_STORE);
}
public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
if (null == topic) {
throw new IllegalArgumentException("topic is null");
}
@@ -130,12 +130,12 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return
this.mQClientFactory.getMQAdminImpl().fetchPublishMessageQueues(topic);
}
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
// check if has info in memory, otherwise invoke api.
Set<MessageQueue> result =
this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
if (null == result) {
@@ -156,17 +156,17 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException
{
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
}
public long maxOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
}
public long minOffset(MessageQueue mq) throws MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
}
@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData
subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException,
InterruptedException {
- this.makeSureStateOK();
+ this.isRunning();
if (null == mq) {
throw new MQClientException("mq is null", null);
@@ -383,7 +383,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
@Override
public void persistConsumerOffset() {
try {
- this.makeSureStateOK();
+ this.isRunning();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq =
this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
@@ -466,7 +466,7 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
final PullCallback pullCallback,
final boolean block,
final long timeout) throws MQClientException, RemotingException,
InterruptedException {
- this.makeSureStateOK();
+ this.isRunning();
if (null == mq) {
throw new MQClientException("mq is null", null);
@@ -543,18 +543,18 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
public QueryResult queryMessage(String topic, String key, int maxNum, long
begin, long end)
throws MQClientException, InterruptedException {
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key,
maxNum, begin, end);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
throws MQClientException, InterruptedException {
- this.makeSureStateOK();
+ this.isRunning();
return
this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq,
timestamp);
}
@@ -748,13 +748,13 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
public void updateConsumeOffset(MessageQueue mq, long offset) throws
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
this.offsetStore.updateOffset(mq, offset, false);
}
public MessageExt viewMessage(String msgId)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
- this.makeSureStateOK();
+ this.isRunning();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}