vongosling closed pull request #49: [ROCKETMQ-72] DefaultMessageStore cannot be 
properly shutdown when fa…
URL: https://github.com/apache/rocketmq/pull/49
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 2594ef323..044a6a144 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -216,38 +216,36 @@ public void start() throws Exception {
 
      */
     public void shutdown() {
-        if (!this.shutdown) {
-            this.shutdown = true;
+        this.shutdown = true;
 
-            this.scheduledExecutorService.shutdown();
+        this.scheduledExecutorService.shutdown();
 
-            try {
+        try {
 
-                Thread.sleep(1000 * 3);
-            } catch (InterruptedException e) {
-                log.error("shutdown Exception, ", e);
-            }
+            Thread.sleep(1000 * 3);
+        } catch (InterruptedException e) {
+            log.error("shutdown Exception, ", e);
+        }
 
-            if (this.scheduleMessageService != null) {
-                this.scheduleMessageService.shutdown();
-            }
+        if (this.scheduleMessageService != null) {
+            this.scheduleMessageService.shutdown();
+        }
 
-            this.haService.shutdown();
+        this.haService.shutdown();
 
-            this.storeStatsService.shutdown();
-            this.indexService.shutdown();
-            this.commitLog.shutdown();
-            this.reputMessageService.shutdown();
-            this.flushConsumeQueueService.shutdown();
-            this.allocateMappedFileService.shutdown();
-            this.storeCheckpoint.flush();
-            this.storeCheckpoint.shutdown();
+        this.storeStatsService.shutdown();
+        this.indexService.shutdown();
+        this.commitLog.shutdown();
+        this.reputMessageService.shutdown();
+        this.flushConsumeQueueService.shutdown();
+        this.allocateMappedFileService.shutdown();
+        this.storeCheckpoint.flush();
+        this.storeCheckpoint.shutdown();
 
-            if (this.runningFlags.isWriteable()) {
-                
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
-            } else {
-                log.warn("the store may be wrong, so shutdown abnormally, and 
keep abort file.");
-            }
+        if (this.runningFlags.isWriteable()) {
+            
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+        } else {
+            log.warn("the store may be wrong, so shutdown abnormally, and keep 
abort file.");
         }
 
         this.transientStorePool.destroy();
@@ -351,7 +349,8 @@ public CommitLog getCommitLog() {
         return commitLog;
     }
 
-    public GetMessageResult getMessage(final String group, final String topic, 
final int queueId, final long offset, final int maxMsgNums,
+    public GetMessageResult getMessage(final String group, final String topic, 
final int queueId, final long offset,
+        final int maxMsgNums,
         final SubscriptionData subscriptionData) {
         if (this.shutdown) {
             log.warn("message store has shutdown, so getMessage is forbidden");
@@ -715,7 +714,7 @@ public boolean appendToCommitLog(long startOffset, byte[] 
data) {
 
     @Override
     public void excuteDeleteFilesManualy() {
-        this.cleanCommitLogService.excuteDeleteFilesManualy();
+        this.cleanCommitLogService.executeDeleteFilesManualy();
     }
 
     @Override
@@ -870,8 +869,9 @@ public void cleanExpiredConsumerQueue() {
         }
     }
 
-    public Map<String, Long> getMessageIds(final String topic, final int 
queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
-        Map<String, Long> messageIds = new HashMap<String, Long>();
+    public Map<String, Long> getMessageIds(final String topic, final int 
queueId, long minOffset, long maxOffset,
+        SocketAddress storeHost) {
+        Map<String, Long> messageIds = new HashMap<>();
         if (this.shutdown) {
             return messageIds;
         }
@@ -979,7 +979,7 @@ public MessageExt lookMessageByOffset(long commitLogOffset, 
int size) {
     public ConsumeQueue findConsumeQueue(String topic, int queueId) {
         ConcurrentHashMap<Integer, ConsumeQueue> map = 
consumeQueueTable.get(topic);
         if (null == map) {
-            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new 
ConcurrentHashMap<Integer, ConsumeQueue>(128);
+            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new 
ConcurrentHashMap<>(128);
             ConcurrentHashMap<Integer, ConsumeQueue> oldMap = 
consumeQueueTable.putIfAbsent(topic, newMap);
             if (oldMap != null) {
                 map = oldMap;
@@ -1198,7 +1198,7 @@ public TransientStorePool getTransientStorePool() {
     private void putConsumeQueue(final String topic, final int queueId, final 
ConsumeQueue consumeQueue) {
         ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = 
this.consumeQueueTable.get(topic);
         if (null == map) {
-            map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
+            map = new ConcurrentHashMap<>();
             map.put(queueId, consumeQueue);
             this.consumeQueueTable.put(topic, map);
         } else {
@@ -1215,7 +1215,7 @@ private void recoverConsumeQueue() {
     }
 
     private void recoverTopicQueueTable() {
-        HashMap<String/* topic-queueid */, Long/* offset */> table = new 
HashMap<String, Long>(1024);
+        HashMap<String/* topic-queueid */, Long/* offset */> table = new 
HashMap<>(1024);
         long minPhyOffset = this.commitLog.getMinOffset();
         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : 
this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
@@ -1278,7 +1278,8 @@ public void doDispatch(DispatchRequest req) {
         }
     }
 
-    public void putMessagePositionInfo(String topic, int queueId, long offset, 
int size, long tagsCode, long storeTimestamp,
+    public void putMessagePositionInfo(String topic, int queueId, long offset, 
int size, long tagsCode,
+        long storeTimestamp,
         long logicOffset) {
         ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
         cq.putMessagePositionInfoWrapper(offset, size, tagsCode, 
storeTimestamp, logicOffset);
@@ -1320,9 +1321,9 @@ public void run() {
 
         private volatile boolean cleanImmediately = false;
 
-        public void excuteDeleteFilesManualy() {
+        public void executeDeleteFilesManualy() {
             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
-            DefaultMessageStore.log.info("excuteDeleteFilesManualy was 
invoked");
+            DefaultMessageStore.log.info("executeDeleteFilesManualy was 
invoked");
         }
 
         public void run() {
@@ -1339,7 +1340,7 @@ private void deleteExpiredFiles() {
             int deleteCount = 0;
             long fileReservedTime = 
DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
             int deletePhysicFilesInterval = 
DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
-            int destroyMapedFileIntervalForcibly = 
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+            int destroyMappedFileIntervalForcibly = 
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
 
             boolean timeup = this.isTimeToDelete();
             boolean spacefull = this.isSpaceToDelete();
@@ -1362,7 +1363,7 @@ private void deleteExpiredFiles() {
                 fileReservedTime *= 60 * 60 * 1000;
 
                 deleteCount = 
DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, 
deletePhysicFilesInterval,
-                    destroyMapedFileIntervalForcibly, cleanAtOnce);
+                    destroyMappedFileIntervalForcibly, cleanAtOnce);
                 if (deleteCount > 0) {
                 } else if (spacefull) {
                     log.warn("disk space will be full soon, but delete file 
failed.");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 762bdb6ad..b92b14723 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -187,8 +187,13 @@ public void beginAccept() throws Exception {
         public void shutdown(final boolean interrupt) {
             super.shutdown(interrupt);
             try {
-                this.serverSocketChannel.close();
-                this.selector.close();
+                if (this.serverSocketChannel != null) {
+                    this.serverSocketChannel.close();
+                }
+
+                if (this.selector != null) {
+                    this.selector.close();
+                }
             } catch (IOException e) {
                 log.error("AcceptSocketService shutdown exception", e);
             }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 5c9c46f7e..06e7fcb9f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -26,9 +26,9 @@
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertTrue;
 
 public class DefaultMessageStoreTest {
     private final String StoreMessage = "Once, there was a chance for me!";
@@ -58,7 +58,7 @@ public void testWriteAndRead() throws Exception {
         MessageStore master = new DefaultMessageStore(messageStoreConfig, 
null, new MyMessageArrivingListener(), new BrokerConfig());
 
         boolean load = master.load();
-        assertTrue(load);
+        assertThat(load).isTrue();
 
         master.start();
         try {
@@ -102,7 +102,7 @@ public void testGroupCommit() throws Exception {
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         MessageStore master = new DefaultMessageStore(messageStoreConfig, 
null, new MyMessageArrivingListener(), new BrokerConfig());
         boolean load = master.load();
-        assertTrue(load);
+        assertThat(load).isTrue();
 
         master.start();
         try {
@@ -117,6 +117,7 @@ public void testGroupCommit() throws Exception {
 
             }
         } finally {
+            master.getRunningDataInfo();
             master.shutdown();
             master.destroy();
         }
@@ -127,4 +128,28 @@ public void testGroupCommit() throws Exception {
         public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode) {
         }
     }
+
+    @Test
+    public void testFailureOnStart() throws Exception {
+        MessageStore master = new DefaultMessageStore(
+            new MessageStoreConfig(), null, new MyMessageArrivingListener(), 
new BrokerConfig());
+        MessageStore masterSpy = Mockito.spy(master);
+
+        Mockito.doThrow(new Exception("Start operation 
failed")).when(masterSpy).start();
+
+        boolean load = masterSpy.load();
+        assertThat(load).isTrue();
+
+        try {
+            masterSpy.start();
+        } catch (Exception ignored) {
+        } finally {
+            // shutdown, no messages are accepted
+            assertThat(masterSpy.putMessage(buildMessage()).isOk()).isFalse();
+
+            // shutdown without errors
+            masterSpy.shutdown();
+            masterSpy.destroy();
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to