vongosling closed pull request #64: [ROCKETMQ-102] When shutdown(), the
persisted offet is not the latest consumed message, which may cause repeated
messages.
URL: https://github.com/apache/rocketmq/pull/64
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 2cce03d34..1776a54a0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -214,6 +214,11 @@
*/
private long consumeTimeout = 15;
+ /**
+ * Maximum time to await message consuming when shutdown consumer, 0
indicates no await.
+ */
+ private long awaitTerminationMillisWhenShutdown = 0;
+
/**
* Default constructor.
*/
@@ -461,7 +466,7 @@ public void start() throws MQClientException {
*/
@Override
public void shutdown() {
- this.defaultMQPushConsumerImpl.shutdown();
+
this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
}
@Override
@@ -616,4 +621,12 @@ public long getConsumeTimeout() {
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
+
+ public long getAwaitTerminationMillisWhenShutdown() {
+ return awaitTerminationMillisWhenShutdown;
+ }
+
+ public void setAwaitTerminationMillisWhenShutdown(long
awaitTerminationMillisWhenShutdown) {
+ this.awaitTerminationMillisWhenShutdown =
awaitTerminationMillisWhenShutdown;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index f566ed0fc..ffb48ccae 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -45,6 +45,7 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
@@ -92,9 +93,15 @@ public void run() {
}, this.defaultMQPushConsumer.getConsumeTimeout(),
this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
- public void shutdown() {
+ @Override
+ public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
+
+ if (!ThreadUtils.terminateExecutor(consumeExecutor,
awaitTerminateMillis)) {
+ log.info("There are messages still being consumed in thread pool,
but not going to await them anymore. Have awaited for {} ms",
awaitTerminateMillis);
+ }
+
this.cleanExpireMsgExecutors.shutdown();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 1fa474caa..ee8d1883f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -45,6 +45,7 @@
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
@@ -62,7 +63,8 @@
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
- public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
+ public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl,
+ MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
@@ -92,10 +94,15 @@ public void run() {
}
}
- public void shutdown() {
+ @Override
+ public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
- this.consumeExecutor.shutdown();
+
+ if (!ThreadUtils.terminateExecutor(consumeExecutor,
awaitTerminateMillis)) {
+ log.info("There are messages still being consumed in thread pool,
but not going to await them anymore. Have awaited for {} ms",
awaitTerminateMillis);
+ }
+
if
(MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel()))
{
this.unlockAllMQ();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 8742191b5..ab4448bc4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -24,7 +24,7 @@
public interface ConsumeMessageService {
void start();
- void shutdown();
+ void shutdown(long awaitTerminateMillis);
void updateCorePoolSize(int corePoolSize);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732dd..d91adf1c1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -515,12 +515,12 @@ private int getMaxReconsumeTimes() {
}
}
- public void shutdown() {
+ public void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
- this.consumeMessageService.shutdown();
+ this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
@@ -593,7 +593,7 @@ public void start() throws MQClientException {
boolean registerOK =
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),
this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
- this.consumeMessageService.shutdown();
+
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" +
this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name
please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 2e0af5aff..d66ed1e8c 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -21,6 +21,7 @@
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -31,6 +32,7 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -135,6 +137,7 @@ public void init() throws Exception {
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
+
messageClientExt.setQueueOffset(((PullMessageRequestHeader)mock.getArgument(1)).getQueueOffset());
PullResult pullResult = createPullResult(requestHeader,
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
@@ -174,6 +177,41 @@ public void testPullMessage_Success() throws
InterruptedException, RemotingExcep
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
+ @Test
+ public void testShutdownAwait() throws Exception {
+ final LinkedList<Long> consumedOffset = new LinkedList<>();
+ pushConsumer.setPullInterval(0);
+ pushConsumer.setPullThresholdForQueue(100);
+ pushConsumer.setConsumeThreadMin(2);
+ pushConsumer.setConsumeThreadMax(10);
+ pushConsumer.setAwaitTerminationMillisWhenShutdown(10* 1000);//await
consume for at most 10 seconds. If we do not set await millis, this test case
will not pass
+
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
new MessageListenerConcurrently() {
+ @Override public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ for (MessageExt msg : msgs) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ synchronized (consumedOffset) {
+ consumedOffset.add(msg.getQueueOffset());
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ }));
+ pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
+ PullMessageService pullMessageService =
mQClientFactory.getPullMessageService();
+ pullMessageService.executePullRequestImmediately(createPullRequest());
+ Thread.sleep(500);
+ pushConsumer.shutdown();
+ long persitOffset
=pushConsumer.getDefaultMQPushConsumerImpl().getOffsetStore().readOffset(new
MessageQueue(topic, brokerName, 0), ReadOffsetType.READ_FROM_MEMORY);
+ Thread.sleep(500);//wait for thread pool to continue consume for some
time, which will emerge problem if thread pool is not terminated well
+ Collections.sort(consumedOffset);
+ assertThat(consumedOffset.getLast() +
1).isEqualTo(persitOffset);//when shutdown with await, the persisted offset
should be the latest message offset
+ }
+
@Test
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 8c28d7002..66d550473 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -25,6 +25,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,6 +165,49 @@ public static void shutdownGracefully(ExecutorService
executor, long timeout, Ti
}
}
+
+
+ /**
+ * Shutdown an terminateExecutor and block util tasks are completed or the
timeout occurs, or the current thread is
+ * interrupted, whichever happens first and return whether the executor is
terminated
+ *
+ * @param executor ExecutorService to shutdown
+ * @param timeout Timeout milliseconds to await termination
+ * @return Whether the executor is terminated
+ * @throws InterruptedException if interrupted while waiting termination
interrupted
+ */
+ public static boolean terminateExecutorInterruptibly(ExecutorService
executor,
+ long timeout) throws InterruptedException {
+ executor.shutdown();
+ //await to consume
+ new ReentrantReadWriteLock().readLock().lock();
+ if (timeout > 0) {
+ executor.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+ }
+ return executor.isTerminated();
+ }
+
+ /**
+ * Shutdown an terminateExecutor and block util tasks are completed or the
timeout occurs, or the current thread is
+ * interrupted, whichever happens first and return whether the executor is
terminated already
+ *
+ * If current thread is interrupted, no InterruptedException will be
thrown but remains the interrupt flag
+ *
+ * @param executor ExecutorService to shutdown
+ * @param timeout Timeout milliseconds to await termination
+ * @return Whether the executor is terminated
+ */
+ public static boolean terminateExecutor(ExecutorService executor, long
timeout) {
+ try {
+ terminateExecutorInterruptibly(executor, timeout);
+ } catch (InterruptedException e) {
+ log.warn("got InterruptedException when awaitTermination. {}",
executor);
+ Thread.currentThread().interrupt(); //catch InterruptedException
and interrupt current thread
+ }
+ return executor.isTerminated();
+ }
+
+
/**
* A constructor to stop this class being constructed.
*/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services