vongosling closed pull request #64: [ROCKETMQ-102] When shutdown(), the 
persisted offet is not the latest consumed message, which may cause repeated 
messages.
URL: https://github.com/apache/rocketmq/pull/64
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 2cce03d34..1776a54a0 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -214,6 +214,11 @@
      */
     private long consumeTimeout = 15;
 
+    /**
+     * Maximum time to await message consuming when shutdown consumer, 0 
indicates no await.
+     */
+    private long awaitTerminationMillisWhenShutdown = 0;
+
     /**
      * Default constructor.
      */
@@ -461,7 +466,7 @@ public void start() throws MQClientException {
      */
     @Override
     public void shutdown() {
-        this.defaultMQPushConsumerImpl.shutdown();
+        
this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
     }
 
     @Override
@@ -616,4 +621,12 @@ public long getConsumeTimeout() {
     public void setConsumeTimeout(final long consumeTimeout) {
         this.consumeTimeout = consumeTimeout;
     }
+
+    public long getAwaitTerminationMillisWhenShutdown() {
+        return awaitTerminationMillisWhenShutdown;
+    }
+
+    public void setAwaitTerminationMillisWhenShutdown(long 
awaitTerminationMillisWhenShutdown) {
+        this.awaitTerminationMillisWhenShutdown = 
awaitTerminationMillisWhenShutdown;
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index f566ed0fc..ffb48ccae 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.CMResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.slf4j.Logger;
 
@@ -92,9 +93,15 @@ public void run() {
         }, this.defaultMQPushConsumer.getConsumeTimeout(), 
this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
     }
 
-    public void shutdown() {
+    @Override
+    public void shutdown(long awaitTerminateMillis) {
         this.scheduledExecutorService.shutdown();
         this.consumeExecutor.shutdown();
+
+        if (!ThreadUtils.terminateExecutor(consumeExecutor, 
awaitTerminateMillis)) {
+            log.info("There are messages still being consumed in thread pool, 
but not going to await them anymore. Have awaited for {} ms", 
awaitTerminateMillis);
+        }
+
         this.cleanExpireMsgExecutors.shutdown();
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 1fa474caa..ee8d1883f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -45,6 +45,7 @@
 import org.apache.rocketmq.common.protocol.body.CMResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.slf4j.Logger;
 
@@ -62,7 +63,8 @@
     private final ScheduledExecutorService scheduledExecutorService;
     private volatile boolean stopped = false;
 
-    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
+    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl,
+        MessageListenerOrderly messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
 
@@ -92,10 +94,15 @@ public void run() {
         }
     }
 
-    public void shutdown() {
+    @Override
+    public void shutdown(long awaitTerminateMillis) {
         this.stopped = true;
         this.scheduledExecutorService.shutdown();
-        this.consumeExecutor.shutdown();
+
+        if (!ThreadUtils.terminateExecutor(consumeExecutor, 
awaitTerminateMillis)) {
+            log.info("There are messages still being consumed in thread pool, 
but not going to await them anymore. Have awaited for {} ms", 
awaitTerminateMillis);
+        }
+
         if 
(MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) 
{
             this.unlockAllMQ();
         }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 8742191b5..ab4448bc4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -24,7 +24,7 @@
 public interface ConsumeMessageService {
     void start();
 
-    void shutdown();
+    void shutdown(long awaitTerminateMillis);
 
     void updateCorePoolSize(int corePoolSize);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732dd..d91adf1c1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -515,12 +515,12 @@ private int getMaxReconsumeTimes() {
         }
     }
 
-    public void shutdown() {
+    public void shutdown(long awaitTerminateMillis) {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
             case RUNNING:
-                this.consumeMessageService.shutdown();
+                this.consumeMessageService.shutdown(awaitTerminateMillis);
                 this.persistConsumerOffset();
                 
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                 this.mQClientFactory.shutdown();
@@ -593,7 +593,7 @@ public void start() throws MQClientException {
                 boolean registerOK = 
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), 
this);
                 if (!registerOK) {
                     this.serviceState = ServiceState.CREATE_JUST;
-                    this.consumeMessageService.shutdown();
+                    
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                     throw new MQClientException("The consumer group[" + 
this.defaultMQPushConsumer.getConsumerGroup()
                         + "] has been created before, specify another name 
please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                         null);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 2e0af5aff..d66ed1e8c 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -21,6 +21,7 @@
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -31,6 +32,7 @@
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -135,6 +137,7 @@ public void init() throws Exception {
                 messageClientExt.setOffsetMsgId("234");
                 messageClientExt.setBornHost(new InetSocketAddress(8080));
                 messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                
messageClientExt.setQueueOffset(((PullMessageRequestHeader)mock.getArgument(1)).getQueueOffset());
                 PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
                 ((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
                 return pullResult;
@@ -174,6 +177,41 @@ public void testPullMessage_Success() throws 
InterruptedException, RemotingExcep
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }
 
+    @Test
+    public void testShutdownAwait() throws Exception {
+        final LinkedList<Long> consumedOffset = new LinkedList<>();
+        pushConsumer.setPullInterval(0);
+        pushConsumer.setPullThresholdForQueue(100);
+        pushConsumer.setConsumeThreadMin(2);
+        pushConsumer.setConsumeThreadMax(10);
+        pushConsumer.setAwaitTerminationMillisWhenShutdown(10* 1000);//await 
consume for at most 10 seconds. If we do not set await millis, this test case 
will not pass
+        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
+            @Override public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    try {
+                        Thread.sleep(50);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    synchronized (consumedOffset) {
+                        consumedOffset.add(msg.getQueueOffset());
+                    }
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        }));
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
+        PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
+        pullMessageService.executePullRequestImmediately(createPullRequest());
+        Thread.sleep(500);
+        pushConsumer.shutdown();
+        long persitOffset 
=pushConsumer.getDefaultMQPushConsumerImpl().getOffsetStore().readOffset(new 
MessageQueue(topic, brokerName, 0), ReadOffsetType.READ_FROM_MEMORY);
+        Thread.sleep(500);//wait for thread pool to continue consume for some 
time, which will emerge problem if thread pool is not terminated well
+        Collections.sort(consumedOffset);
+        assertThat(consumedOffset.getLast() + 
1).isEqualTo(persitOffset);//when shutdown with await, the persisted offset 
should be the latest message offset
+    }
+
     @Test
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 8c28d7002..66d550473 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -164,6 +165,49 @@ public static void shutdownGracefully(ExecutorService 
executor, long timeout, Ti
         }
     }
 
+
+
+    /**
+     * Shutdown an terminateExecutor and block util tasks are completed or the 
timeout occurs, or the current thread is
+     * interrupted, whichever happens first and return whether the executor is 
terminated
+     *
+     * @param executor ExecutorService to shutdown
+     * @param timeout Timeout milliseconds to await termination
+     * @return Whether the executor is terminated
+     * @throws InterruptedException if interrupted while waiting termination 
interrupted
+     */
+    public static boolean terminateExecutorInterruptibly(ExecutorService 
executor,
+        long timeout) throws InterruptedException {
+        executor.shutdown();
+        //await to consume
+        new ReentrantReadWriteLock().readLock().lock();
+        if (timeout > 0) {
+            executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown an terminateExecutor and block util tasks are completed or the 
timeout occurs, or the current thread is
+     * interrupted, whichever happens first and return whether the executor is 
terminated already
+     *
+     * If current thread is interrupted, no InterruptedException will be 
thrown but remains the interrupt flag
+     *
+     * @param executor ExecutorService to shutdown
+     * @param timeout Timeout milliseconds to await termination
+     * @return Whether the executor is terminated
+     */
+    public static boolean terminateExecutor(ExecutorService executor, long 
timeout) {
+        try {
+            terminateExecutorInterruptibly(executor, timeout);
+        } catch (InterruptedException e) {
+            log.warn("got InterruptedException when awaitTermination. {}", 
executor);
+            Thread.currentThread().interrupt(); //catch InterruptedException 
and interrupt current thread
+        }
+        return executor.isTerminated();
+    }
+
+
     /**
      * A constructor to stop this class being constructed.
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to