vongosling closed pull request #49: [ROCKETMQ-72] DefaultMessageStore cannot be
properly shutdown when fa…
URL: https://github.com/apache/rocketmq/pull/49
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2594ef323..044a6a144 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -216,38 +216,36 @@ public void start() throws Exception {
*/
public void shutdown() {
- if (!this.shutdown) {
- this.shutdown = true;
+ this.shutdown = true;
- this.scheduledExecutorService.shutdown();
+ this.scheduledExecutorService.shutdown();
- try {
+ try {
- Thread.sleep(1000 * 3);
- } catch (InterruptedException e) {
- log.error("shutdown Exception, ", e);
- }
+ Thread.sleep(1000 * 3);
+ } catch (InterruptedException e) {
+ log.error("shutdown Exception, ", e);
+ }
- if (this.scheduleMessageService != null) {
- this.scheduleMessageService.shutdown();
- }
+ if (this.scheduleMessageService != null) {
+ this.scheduleMessageService.shutdown();
+ }
- this.haService.shutdown();
+ this.haService.shutdown();
- this.storeStatsService.shutdown();
- this.indexService.shutdown();
- this.commitLog.shutdown();
- this.reputMessageService.shutdown();
- this.flushConsumeQueueService.shutdown();
- this.allocateMappedFileService.shutdown();
- this.storeCheckpoint.flush();
- this.storeCheckpoint.shutdown();
+ this.storeStatsService.shutdown();
+ this.indexService.shutdown();
+ this.commitLog.shutdown();
+ this.reputMessageService.shutdown();
+ this.flushConsumeQueueService.shutdown();
+ this.allocateMappedFileService.shutdown();
+ this.storeCheckpoint.flush();
+ this.storeCheckpoint.shutdown();
- if (this.runningFlags.isWriteable()) {
-
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
- } else {
- log.warn("the store may be wrong, so shutdown abnormally, and
keep abort file.");
- }
+ if (this.runningFlags.isWriteable()) {
+
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+ } else {
+ log.warn("the store may be wrong, so shutdown abnormally, and keep
abort file.");
}
this.transientStorePool.destroy();
@@ -351,7 +349,8 @@ public CommitLog getCommitLog() {
return commitLog;
}
- public GetMessageResult getMessage(final String group, final String topic,
final int queueId, final long offset, final int maxMsgNums,
+ public GetMessageResult getMessage(final String group, final String topic,
final int queueId, final long offset,
+ final int maxMsgNums,
final SubscriptionData subscriptionData) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
@@ -715,7 +714,7 @@ public boolean appendToCommitLog(long startOffset, byte[]
data) {
@Override
public void excuteDeleteFilesManualy() {
- this.cleanCommitLogService.excuteDeleteFilesManualy();
+ this.cleanCommitLogService.executeDeleteFilesManualy();
}
@Override
@@ -870,8 +869,9 @@ public void cleanExpiredConsumerQueue() {
}
}
- public Map<String, Long> getMessageIds(final String topic, final int
queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
- Map<String, Long> messageIds = new HashMap<String, Long>();
+ public Map<String, Long> getMessageIds(final String topic, final int
queueId, long minOffset, long maxOffset,
+ SocketAddress storeHost) {
+ Map<String, Long> messageIds = new HashMap<>();
if (this.shutdown) {
return messageIds;
}
@@ -979,7 +979,7 @@ public MessageExt lookMessageByOffset(long commitLogOffset,
int size) {
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentHashMap<Integer, ConsumeQueue> map =
consumeQueueTable.get(topic);
if (null == map) {
- ConcurrentHashMap<Integer, ConsumeQueue> newMap = new
ConcurrentHashMap<Integer, ConsumeQueue>(128);
+ ConcurrentHashMap<Integer, ConsumeQueue> newMap = new
ConcurrentHashMap<>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap =
consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
@@ -1198,7 +1198,7 @@ public TransientStorePool getTransientStorePool() {
private void putConsumeQueue(final String topic, final int queueId, final
ConsumeQueue consumeQueue) {
ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map =
this.consumeQueueTable.get(topic);
if (null == map) {
- map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
+ map = new ConcurrentHashMap<>();
map.put(queueId, consumeQueue);
this.consumeQueueTable.put(topic, map);
} else {
@@ -1215,7 +1215,7 @@ private void recoverConsumeQueue() {
}
private void recoverTopicQueueTable() {
- HashMap<String/* topic-queueid */, Long/* offset */> table = new
HashMap<String, Long>(1024);
+ HashMap<String/* topic-queueid */, Long/* offset */> table = new
HashMap<>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentHashMap<Integer, ConsumeQueue> maps :
this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
@@ -1278,7 +1278,8 @@ public void doDispatch(DispatchRequest req) {
}
}
- public void putMessagePositionInfo(String topic, int queueId, long offset,
int size, long tagsCode, long storeTimestamp,
+ public void putMessagePositionInfo(String topic, int queueId, long offset,
int size, long tagsCode,
+ long storeTimestamp,
long logicOffset) {
ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
cq.putMessagePositionInfoWrapper(offset, size, tagsCode,
storeTimestamp, logicOffset);
@@ -1320,9 +1321,9 @@ public void run() {
private volatile boolean cleanImmediately = false;
- public void excuteDeleteFilesManualy() {
+ public void executeDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
- DefaultMessageStore.log.info("excuteDeleteFilesManualy was
invoked");
+ DefaultMessageStore.log.info("executeDeleteFilesManualy was
invoked");
}
public void run() {
@@ -1339,7 +1340,7 @@ private void deleteExpiredFiles() {
int deleteCount = 0;
long fileReservedTime =
DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval =
DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
- int destroyMapedFileIntervalForcibly =
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+ int destroyMappedFileIntervalForcibly =
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
@@ -1362,7 +1363,7 @@ private void deleteExpiredFiles() {
fileReservedTime *= 60 * 60 * 1000;
deleteCount =
DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime,
deletePhysicFilesInterval,
- destroyMapedFileIntervalForcibly, cleanAtOnce);
+ destroyMappedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file
failed.");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 762bdb6ad..b92b14723 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -187,8 +187,13 @@ public void beginAccept() throws Exception {
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
try {
- this.serverSocketChannel.close();
- this.selector.close();
+ if (this.serverSocketChannel != null) {
+ this.serverSocketChannel.close();
+ }
+
+ if (this.selector != null) {
+ this.selector.close();
+ }
} catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 5c9c46f7e..06e7fcb9f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -26,9 +26,9 @@
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertTrue;
public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!";
@@ -58,7 +58,7 @@ public void testWriteAndRead() throws Exception {
MessageStore master = new DefaultMessageStore(messageStoreConfig,
null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
- assertTrue(load);
+ assertThat(load).isTrue();
master.start();
try {
@@ -102,7 +102,7 @@ public void testGroupCommit() throws Exception {
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
MessageStore master = new DefaultMessageStore(messageStoreConfig,
null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
- assertTrue(load);
+ assertThat(load).isTrue();
master.start();
try {
@@ -117,6 +117,7 @@ public void testGroupCommit() throws Exception {
}
} finally {
+ master.getRunningDataInfo();
master.shutdown();
master.destroy();
}
@@ -127,4 +128,28 @@ public void testGroupCommit() throws Exception {
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode) {
}
}
+
+ @Test
+ public void testFailureOnStart() throws Exception {
+ MessageStore master = new DefaultMessageStore(
+ new MessageStoreConfig(), null, new MyMessageArrivingListener(),
new BrokerConfig());
+ MessageStore masterSpy = Mockito.spy(master);
+
+ Mockito.doThrow(new Exception("Start operation
failed")).when(masterSpy).start();
+
+ boolean load = masterSpy.load();
+ assertThat(load).isTrue();
+
+ try {
+ masterSpy.start();
+ } catch (Exception ignored) {
+ } finally {
+ // shutdown, no messages are accepted
+ assertThat(masterSpy.putMessage(buildMessage()).isOk()).isFalse();
+
+ // shutdown without errors
+ masterSpy.shutdown();
+ masterSpy.destroy();
+ }
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services