Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master d9c398f88 -> 11ff542c4


[ROCKETMQ-45]Delete hanged consume queue files


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/11ff542c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/11ff542c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/11ff542c

Branch: refs/heads/master
Commit: 11ff542c4382faf3bb7a74a80329f1fcab6a07a6
Parents: d9c398f
Author: Zhanhui Li <[email protected]>
Authored: Thu Jan 12 15:02:26 2017 +0800
Committer: Zhanhui Li <[email protected]>
Committed: Sun Jan 22 14:48:16 2017 +0800

----------------------------------------------------------------------
 .../org/apache/rocketmq/store/ConsumeQueue.java | 28 +++++------
 .../rocketmq/store/DefaultMessageStore.java     | 14 +++---
 .../org/apache/rocketmq/store/MappedFile.java   | 22 ++++-----
 .../apache/rocketmq/store/MappedFileQueue.java  |  4 ++
 .../store/schedule/ScheduleMessageService.java  |  2 +-
 .../rocketmq/store/MappedFileQueueTest.java     | 50 ++++++++++++++++++--
 6 files changed, 84 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index c5e38db..dc1c96c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -77,19 +77,19 @@ public class ConsumeQueue {
             if (index < 0)
                 index = 0;
 
-            int mapedFileSizeLogics = this.mappedFileSize;
+            int mappedFileSizeLogics = this.mappedFileSize;
             MappedFile mappedFile = mappedFiles.get(index);
             ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
             long processOffset = mappedFile.getFileFromOffset();
-            long mapedFileOffset = 0;
+            long mappedFileOffset = 0;
             while (true) {
-                for (int i = 0; i < mapedFileSizeLogics; i += 
CQ_STORE_UNIT_SIZE) {
+                for (int i = 0; i < mappedFileSizeLogics; i += 
CQ_STORE_UNIT_SIZE) {
                     long offset = byteBuffer.getLong();
                     int size = byteBuffer.getInt();
                     long tagsCode = byteBuffer.getLong();
 
                     if (offset >= 0 && size > 0) {
-                        mapedFileOffset = i + CQ_STORE_UNIT_SIZE;
+                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                         this.maxPhysicOffset = offset;
                     } else {
                         log.info("recover current consume queue file over,  " 
+ mappedFile.getFileName() + " "
@@ -98,7 +98,7 @@ public class ConsumeQueue {
                     }
                 }
 
-                if (mapedFileOffset == mapedFileSizeLogics) {
+                if (mappedFileOffset == mappedFileSizeLogics) {
                     index++;
                     if (index >= mappedFiles.size()) {
 
@@ -109,17 +109,17 @@ public class ConsumeQueue {
                         mappedFile = mappedFiles.get(index);
                         byteBuffer = mappedFile.sliceByteBuffer();
                         processOffset = mappedFile.getFileFromOffset();
-                        mapedFileOffset = 0;
+                        mappedFileOffset = 0;
                         log.info("recover next consume queue file, " + 
mappedFile.getFileName());
                     }
                 } else {
                     log.info("recover current consume queue queue over " + 
mappedFile.getFileName() + " "
-                        + (processOffset + mapedFileOffset));
+                        + (processOffset + mappedFileOffset));
                     break;
                 }
             }
 
-            processOffset += mapedFileOffset;
+            processOffset += mappedFileOffset;
             this.mappedFileQueue.setFlushedWhere(processOffset);
             this.mappedFileQueue.setCommittedWhere(processOffset);
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
@@ -310,7 +310,7 @@ public class ConsumeQueue {
 
                         if (offsetPy >= phyMinOffset) {
                             this.minLogicOffset = 
result.getMappedFile().getFileFromOffset() + i;
-                            log.info("compute logics min offset: " + 
this.getMinOffsetInQuque() + ", topic: "
+                            log.info("compute logics min offset: " + 
this.getMinOffsetInQueue() + ", topic: "
                                 + this.topic + ", queueId: " + this.queueId);
                             break;
                         }
@@ -324,7 +324,7 @@ public class ConsumeQueue {
         }
     }
 
-    public long getMinOffsetInQuque() {
+    public long getMinOffsetInQueue() {
         return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
     }
 
@@ -435,8 +435,8 @@ public class ConsumeQueue {
     }
 
     public long rollNextFile(final long index) {
-        int mapedFileSize = this.mappedFileSize;
-        int totalUnitsInFile = mapedFileSize / CQ_STORE_UNIT_SIZE;
+        int mappedFileSize = this.mappedFileSize;
+        int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
         return index + totalUnitsInFile - index % totalUnitsInFile;
     }
 
@@ -463,10 +463,10 @@ public class ConsumeQueue {
     }
 
     public long getMessageTotalInQueue() {
-        return this.getMaxOffsetInQuque() - this.getMinOffsetInQuque();
+        return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue();
     }
 
-    public long getMaxOffsetInQuque() {
+    public long getMaxOffsetInQueue() {
         return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 3a43c21..2594ef3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -376,8 +376,8 @@ public class DefaultMessageStore implements MessageStore {
 
         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
         if (consumeQueue != null) {
-            minOffset = consumeQueue.getMinOffsetInQuque();
-            maxOffset = consumeQueue.getMaxOffsetInQuque();
+            minOffset = consumeQueue.getMinOffsetInQueue();
+            maxOffset = consumeQueue.getMaxOffsetInQueue();
 
             if (maxOffset == 0) {
                 status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
@@ -499,7 +499,7 @@ public class DefaultMessageStore implements MessageStore {
     public long getMaxOffsetInQuque(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
-            long offset = logic.getMaxOffsetInQuque();
+            long offset = logic.getMaxOffsetInQueue();
             return offset;
         }
 
@@ -512,7 +512,7 @@ public class DefaultMessageStore implements MessageStore {
     public long getMinOffsetInQuque(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
-            return logic.getMinOffsetInQuque();
+            return logic.getMinOffsetInQueue();
         }
 
         return -1;
@@ -878,8 +878,8 @@ public class DefaultMessageStore implements MessageStore {
 
         ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
         if (consumeQueue != null) {
-            minOffset = Math.max(minOffset, 
consumeQueue.getMinOffsetInQuque());
-            maxOffset = Math.min(maxOffset, 
consumeQueue.getMaxOffsetInQuque());
+            minOffset = Math.max(minOffset, 
consumeQueue.getMinOffsetInQueue());
+            maxOffset = Math.min(maxOffset, 
consumeQueue.getMaxOffsetInQueue());
 
             if (maxOffset == 0) {
                 return messageIds;
@@ -1220,7 +1220,7 @@ public class DefaultMessageStore implements MessageStore {
         for (ConcurrentHashMap<Integer, ConsumeQueue> maps : 
this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 String key = logic.getTopic() + "-" + logic.getQueueId();
-                table.put(key, logic.getMaxOffsetInQuque());
+                table.put(key, logic.getMaxOffsetInQueue());
                 logic.correctMinOffset(minPhyOffset);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index cd4b57b..feb505d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -43,9 +43,9 @@ public class MappedFile extends ReferenceResource {
     public static final int OS_PAGE_SIZE = 1024 * 4;
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-    private static final AtomicLong TOTAL_MAPED_VITUAL_MEMORY = new 
AtomicLong(0);
+    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new 
AtomicLong(0);
 
-    private static final AtomicInteger TOTAL_MAPED_FILES = new 
AtomicInteger(0);
+    private static final AtomicInteger TOTAL_MAPPED_FILES = new 
AtomicInteger(0);
     protected final AtomicInteger wrotePosition = new AtomicInteger(0);
     //ADD BY ChenYang
     protected final AtomicInteger committedPosition = new AtomicInteger(0);
@@ -132,12 +132,12 @@ public class MappedFile extends ReferenceResource {
             return viewed(viewedBuffer);
     }
 
-    public static int getTotalmapedfiles() {
-        return TOTAL_MAPED_FILES.get();
+    public static int getTotalMappedFiles() {
+        return TOTAL_MAPPED_FILES.get();
     }
 
-    public static long getTotalMapedVitualMemory() {
-        return TOTAL_MAPED_VITUAL_MEMORY.get();
+    public static long getTotalMappedVirtualMemory() {
+        return TOTAL_MAPPED_VIRTUAL_MEMORY.get();
     }
 
     public void init(final String fileName, final int fileSize, final 
TransientStorePool transientStorePool) throws IOException {
@@ -158,8 +158,8 @@ public class MappedFile extends ReferenceResource {
         try {
             this.fileChannel = new RandomAccessFile(this.file, 
"rw").getChannel();
             this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 
0, fileSize);
-            TOTAL_MAPED_VITUAL_MEMORY.addAndGet(fileSize);
-            TOTAL_MAPED_FILES.incrementAndGet();
+            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
+            TOTAL_MAPPED_FILES.incrementAndGet();
             ok = true;
         } catch (FileNotFoundException e) {
             log.error("create file channel " + this.fileName + " Failed. ", e);
@@ -405,8 +405,8 @@ public class MappedFile extends ReferenceResource {
         }
 
         clean(this.mappedByteBuffer);
-        TOTAL_MAPED_VITUAL_MEMORY.addAndGet(this.fileSize * (-1));
-        TOTAL_MAPED_FILES.decrementAndGet();
+        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
+        TOTAL_MAPPED_FILES.decrementAndGet();
         log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " 
OK");
         return true;
     }
@@ -431,7 +431,7 @@ public class MappedFile extends ReferenceResource {
 
             return true;
         } else {
-            log.warn("destroy maped file[REF:" + this.getRefCount() + "] " + 
this.fileName
+            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + 
this.fileName
                 + " Failed. cleanupOver: " + this.cleanupOver);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index f98b26a..5c6c62c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -397,11 +397,15 @@ public class MappedFileQueue {
                         log.info("physic min offset " + offset + ", logics in 
current mappedFile max offset "
                             + maxOffsetInLogicQueue + ", delete it");
                     }
+                } else if (!mappedFile.isAvailable()) { // Handle hanged file.
+                    log.warn("Found a hanged consume queue file, attempting to 
delete it.");
+                    destroy = true;
                 } else {
                     log.warn("this being not executed forever.");
                     break;
                 }
 
+                // TODO: Externalize this hardcoded value
                 if (destroy && mappedFile.destroy(1000 * 60)) {
                     files.add(mappedFile);
                     deleteCount++;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 5b42082..e08a6f5 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -324,7 +324,7 @@ public class ScheduleMessageService extends ConfigManager {
 
 
                      */
-                    long cqMinOffset = cq.getMinOffsetInQuque();
+                    long cqMinOffset = cq.getMinOffsetInQueue();
                     if (offset < cqMinOffset) {
                         failScheduleOffset = cqMinOffset;
                         log.error("schedule CQ offset invalid. offset=" + 
offset + ", cqMinOffset="

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/11ff542c/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
index 90fbbd3..2d6c112 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
@@ -20,6 +20,8 @@
  */
 package org.apache.rocketmq.store;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class MappedFileQueueTest {
@@ -55,7 +58,7 @@ public class MappedFileQueueTest {
     }
 
     @Test
-    public void test_getLastMapedFile() {
+    public void test_getLastMappedFile() {
         final String fixedMsg = "0123456789abcdef";
 
         
logger.debug("================================================================");
@@ -79,7 +82,7 @@ public class MappedFileQueueTest {
     }
 
     @Test
-    public void test_findMapedFileByOffset() {
+    public void test_findMappedFileByOffset() {
         // four-byte string.
         final String fixedMsg = "abcd";
 
@@ -179,7 +182,7 @@ public class MappedFileQueueTest {
     }
 
     @Test
-    public void test_getMapedMemorySize() {
+    public void test_getMappedMemorySize() {
         final String fixedMsg = "abcd";
 
         
logger.debug("================================================================");
@@ -200,4 +203,45 @@ public class MappedFileQueueTest {
         mappedFileQueue.destroy();
         logger.debug("MappedFileQueue.getMappedMemorySize() OK");
     }
+
+
+    @Test
+    public void test_deleteExpiredFileByOffset() {
+
+        
logger.debug("================================================================");
+        MappedFileQueue mappedFileQueue =
+            new MappedFileQueue("target/unit_test_store/e", 5120, null);
+
+        for (int i = 0; i < 2048; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            assertNotNull(mappedFile);
+
+            ByteBuffer byteBuffer = 
ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            byteBuffer.putLong(i);
+            byte[] padding = new byte[12];
+            Arrays.fill(padding, (byte)'0');
+            byteBuffer.put(padding);
+            byteBuffer.flip();
+
+            boolean result = mappedFile.appendMessage(byteBuffer.array());
+
+            assertTrue(result);
+        }
+
+        MappedFile first = mappedFileQueue.getFirstMappedFile();
+        first.hold();
+
+        int count = mappedFileQueue.deleteExpiredFileByOffset(20480, 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        assertEquals(0, count);
+        first.release();
+
+        count = mappedFileQueue.deleteExpiredFileByOffset(20480, 
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        assertTrue(count > 0);
+        first = mappedFileQueue.getFirstMappedFile();
+        assertTrue(first.getFileFromOffset() > 0);
+
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+        logger.debug("MappedFileQueue.deleteExpiredFileByOffset() OK");
+    }
 }

Reply via email to