[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer 
start/shutdown, closes apache/incubator-rocketmq#68


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f508f131
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f508f131
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f508f131

Branch: refs/heads/release-4.1.0-incubating
Commit: f508f131f7dee2bcb86e66a6beb7bbdedbe31bc6
Parents: c183e0d
Author: Jaskey <[email protected]>
Authored: Wed Apr 19 11:58:48 2017 +0800
Committer: yukon <[email protected]>
Committed: Wed Apr 19 11:58:48 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPullConsumerImpl.java     | 21 +++++++++++++-------
 .../consumer/DefaultMQPushConsumerImpl.java     | 15 +++++++-------
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f508f131/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index b26d062..7d43b37 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new 
ArrayList<ConsumeMessageHook>();
     private final ArrayList<FilterMessageHook> filterMessageHookList = new 
ArrayList<FilterMessageHook>();
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
@@ -161,7 +161,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, 
timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, 
long offset, int maxNums, boolean block, long timeout)
+    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, 
long offset, int maxNums, boolean block,
+        long timeout)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
         this.makeSureStateOK();
 
@@ -365,7 +366,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         pull(mq, subExpression, offset, maxNums, pullCallback, 
this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
     }
 
-    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback, long timeout)
+    public void pull(MessageQueue mq, String subExpression, long offset, int 
maxNums, PullCallback pullCallback,
+        long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, 
false, timeout);
     }
@@ -449,7 +451,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         return defaultMQPullConsumer;
     }
 
-    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, 
long offset, int maxNums, PullCallback pullCallback)
+    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, 
long offset, int maxNums,
+        PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, 
true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
@@ -510,7 +513,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -528,7 +531,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
@@ -593,6 +596,7 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
             default:
                 break;
         }
+
     }
 
     private void checkConfig() throws MQClientException {
@@ -662,7 +666,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         this.offsetStore.updateOffset(mq, offset, false);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, 
MQBrokerException, InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
@@ -692,6 +697,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
         return serviceState;
     }
 
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
     public void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f508f131/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732..67f3ebe 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new 
ArrayList<ConsumeMessageHook>();
     private final RPCHook rpcHook;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private volatile boolean pause = false;
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -535,7 +535,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 log.info("the consumer [{}] start beginning. messageModel={}, 
isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
@@ -615,9 +615,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         }
 
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
         this.mQClientFactory.rebalanceImmediately();
     }
 
@@ -855,7 +853,8 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         this.consumeMessageService.updateCorePoolSize(corePoolSize);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, 
MQBrokerException, InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
 
@@ -1014,7 +1013,9 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         return serviceState;
     }
 
-    public void setServiceState(ServiceState serviceState) {
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
+    public synchronized void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }
 

Reply via email to