vongosling closed pull request #133: [ROCKETMQ-249] Do not attempt to clear
disk even if disk store path i…
URL: https://github.com/apache/rocketmq/pull/133
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 15d41087c..f0091175d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -184,15 +184,27 @@ public static String timeMillisToHumanString3(final long
t) {
cal.get(Calendar.SECOND));
}
+ /**
+ * Estimates the disk usage percentage by the specified path.
+ *
+ * @param path Path.
+ * @return Disk usage percentage by path.
+ */
public static double getDiskPartitionSpaceUsedPercent(final String path) {
- if (null == path || path.isEmpty())
+ if (null == path || path.isEmpty()) {
+ log.warn("Can't estimate used disk space percentage by an empty
path");
+
return -1;
+ }
try {
File file = new File(path);
- if (!file.exists())
+ if (!file.exists()) {
+ log.warn("No file to estimate used disk space percentage");
+
return -1;
+ }
long totalSpace = file.getTotalSpace();
@@ -203,6 +215,8 @@ public static double getDiskPartitionSpaceUsedPercent(final
String path) {
return usedSpace / (double) totalSpace;
}
} catch (Exception e) {
+ log.warn("Failed to estimate used disk space percentage", e);
+
return -1;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b5bac3f72..7c20f5f62 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -413,8 +413,9 @@ public CommitLog getCommitLog() {
return commitLog;
}
- public GetMessageResult getMessage(final String group, final String topic,
final int queueId, final long offset, final int maxMsgNums,
- final MessageFilter messageFilter) {
+ public GetMessageResult getMessage(final String group, final String topic,
final int queueId, final long offset,
+ final int maxMsgNums,
+ final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
@@ -800,7 +801,7 @@ public boolean appendToCommitLog(long startOffset, byte[]
data) {
@Override
public void executeDeleteFilesManually() {
- this.cleanCommitLogService.excuteDeleteFilesManualy();
+ this.cleanCommitLogService.executeDeleteFilesManualy();
}
@Override
@@ -955,7 +956,8 @@ public void cleanExpiredConsumerQueue() {
}
}
- public Map<String, Long> getMessageIds(final String topic, final int
queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
+ public Map<String, Long> getMessageIds(final String topic, final int
queueId, long minOffset, long maxOffset,
+ SocketAddress storeHost) {
Map<String, Long> messageIds = new HashMap<String, Long>();
if (this.shutdown) {
return messageIds;
@@ -1419,21 +1421,24 @@ public void dispatch(DispatchRequest request) {
}
}
+ /** Service to periodically reclaim disk space. */
class CleanCommitLogService {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
+
private final double diskSpaceWarningLevelRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio",
"0.90"));
private final double diskSpaceCleanForciblyRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio",
"0.85"));
+
private long lastRedeleteTimestamp = 0;
private volatile int manualDeleteFileSeveralTimes = 0;
private volatile boolean cleanImmediately = false;
- public void excuteDeleteFilesManualy() {
+ void executeDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
DefaultMessageStore.log.info("executeDeleteFilesManually was
invoked");
}
@@ -1488,9 +1493,9 @@ private void redeleteHangedFile() {
long currentTimestamp = System.currentTimeMillis();
if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
this.lastRedeleteTimestamp = currentTimestamp;
- int destroyMapedFileIntervalForcibly =
+ int destroyMappedFileIntervalForcibly =
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
- if
(DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly))
{
+ if
(DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMappedFileIntervalForcibly))
{
}
}
}
@@ -1509,65 +1514,62 @@ private boolean isTimeToDelete() {
return false;
}
- private boolean isSpaceToDelete() {
- double ratio =
DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() /
100.0;
+ double getDiskUsageRatio() {
+ return UtilAll.getDiskPartitionSpaceUsedPercent(
+
DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
+ }
- cleanImmediately = false;
+ double getQueueSpace() {
+ return
UtilAll.getDiskPartitionSpaceUsedPercent(StorePathConfigHelper
+
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()));
+ }
- {
- String storePathPhysic =
DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio =
UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
- if (physicRatio > diskSpaceWarningLevelRatio) {
- boolean diskok =
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
- if (diskok) {
- DefaultMessageStore.log.error("physic disk maybe full
soon " + physicRatio + ", so mark disk full");
- }
+ /**
+ * Checks if cleaning on the disk is needed.
+ *
+ * @param usageRatio Usage ratio.
+ * @param allowedRatio Allowed ratio.
+ * @return <code>True</code> if cleaning is needed, otherwise
<code>false</code>.
+ */
+ private boolean needCleaning(double usageRatio, double allowedRatio) {
+ if (usageRatio < 0)
+ return false;
- cleanImmediately = true;
- } else if (physicRatio > diskSpaceCleanForciblyRatio) {
- cleanImmediately = true;
- } else {
- boolean diskok =
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
- if (!diskok) {
- DefaultMessageStore.log.info("physic disk space OK " +
physicRatio + ", so mark disk ok");
- }
+ if (usageRatio > diskSpaceWarningLevelRatio) {
+ boolean diskFull =
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+ if (diskFull) {
+ DefaultMessageStore.log.error("Disk maybe full soon
(ratio: {}). Mark disk full", usageRatio);
}
- if (physicRatio < 0 || physicRatio > ratio) {
- DefaultMessageStore.log.info("physic disk maybe full soon,
so reclaim space, " + physicRatio);
- return true;
+ cleanImmediately = true;
+ } else if (usageRatio > diskSpaceCleanForciblyRatio) {
+ cleanImmediately = true;
+ } else {
+ boolean diskSpaceOk =
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+ if (diskSpaceOk) {
+ DefaultMessageStore.log.info("Marking disk space OK
(ratio: {})", usageRatio);
}
}
- {
- String storePathLogics = StorePathConfigHelper
-
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
- double logicsRatio =
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
- if (logicsRatio > diskSpaceWarningLevelRatio) {
- boolean diskok =
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
- if (diskok) {
- DefaultMessageStore.log.error("logics disk maybe full
soon " + logicsRatio + ", so mark disk full");
- }
-
- cleanImmediately = true;
- } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
- cleanImmediately = true;
- } else {
- boolean diskok =
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
- if (!diskok) {
- DefaultMessageStore.log.info("logics disk space OK " +
logicsRatio + ", so mark disk ok");
- }
- }
-
- if (logicsRatio < 0 || logicsRatio > ratio) {
- DefaultMessageStore.log.info("logics disk maybe full soon,
so reclaim space, " + logicsRatio);
- return true;
- }
+ if (usageRatio > allowedRatio) {
+ DefaultMessageStore.log.info("Disk maybe full soon (ratio:
{}). Reclaim space", usageRatio);
+ return true;
}
return false;
}
+ /**
+ * @return <code>True</code> if space needs to be reclaimed, otherwise
<code>false</code>.
+ */
+ private boolean isSpaceToDelete() {
+ double ratio =
DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() /
100.0;
+
+ cleanImmediately = false;
+
+ return needCleaning(getDiskUsageRatio(), ratio) ||
needCleaning(getQueueSpace(), ratio);
+ }
+
public int getManualDeleteFileSeveralTimes() {
return manualDeleteFileSeveralTimes;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 3dcd8611a..2d373441f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -126,7 +126,7 @@ public boolean getAndMakeDiskFull() {
}
public boolean getAndMakeDiskOK() {
- boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
+ boolean result = (this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT;
this.flagBits &= ~DISK_FULL_BIT;
return result;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/CleanCommitLogServiceTest.java
b/store/src/test/java/org/apache/rocketmq/store/CleanCommitLogServiceTest.java
new file mode 100644
index 000000000..e843cff26
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/CleanCommitLogServiceTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for CleanCommitLogService.
+ */
+public class CleanCommitLogServiceTest {
+ DefaultMessageStore messageStore;
+ DefaultMessageStore.CleanCommitLogService cleanCommitLogService;
+
+ // CleanCommitLogService class.
+ Class<?> clazz;
+
+ @Before
+ public void setUp() throws Exception {
+ clazz =
Class.forName("org.apache.rocketmq.store.DefaultMessageStore$CleanCommitLogService");
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStore = new DefaultMessageStore(
+ messageStoreConfig, null, null, new BrokerConfig());
+
+ Field cleanCommitLogServiceField =
messageStore.getClass().getDeclaredField("cleanCommitLogService");
+ cleanCommitLogServiceField.setAccessible(true);
+
+ cleanCommitLogService =
spy((DefaultMessageStore.CleanCommitLogService)
(cleanCommitLogServiceField.get(messageStore)));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ messageStore.shutdown();
+ }
+
+ @Test
+ public void isSpaceToDeleteNoCommitLog() throws Exception {
+ Method m = clazz.getDeclaredMethod("isSpaceToDelete");
+ m.setAccessible(true);
+
+ Assert.assertFalse((boolean) m.invoke(cleanCommitLogService));
+ }
+
+ @Test
+ public void isSpaceToDeleteWithCommitLogNoQueue() throws Exception {
+ messageStore.start();
+ messageStore.load();
+
+ Method m = clazz.getDeclaredMethod("isSpaceToDelete");
+ m.setAccessible(true);
+
+ Assert.assertFalse((boolean) m.invoke(cleanCommitLogService));
+ }
+
+ @Test
+ public void isSpaceToDeleteWithCommitLogDiskFull() throws Exception {
+ messageStore.start();
+ messageStore.load();
+
+ Method m = clazz.getDeclaredMethod("isSpaceToDelete");
+ m.setAccessible(true);
+
+ when(cleanCommitLogService.getDiskUsageRatio()).thenReturn(0.9);
+ Assert.assertTrue((boolean) m.invoke(cleanCommitLogService));
+ }
+
+ @Test
+ public void isSpaceToDeleteWithCommitLogQueueFull() throws Exception {
+ messageStore.start();
+ messageStore.load();
+
+ Method m = clazz.getDeclaredMethod("isSpaceToDelete");
+ m.setAccessible(true);
+
+ when(cleanCommitLogService.getDiskUsageRatio()).thenReturn(0.1);
+ when(cleanCommitLogService.getQueueSpace()).thenReturn(0.9);
+ Assert.assertTrue((boolean) m.invoke(cleanCommitLogService));
+ }
+}
diff --git a/store/src/test/resources/logback-test.xml
b/store/src/test/resources/logback-test.xml
index 875b6715a..ac9ea831d 100644
--- a/store/src/test/resources/logback-test.xml
+++ b/store/src/test/resources/logback-test.xml
@@ -24,7 +24,7 @@
</encoder>
</appender>
- <logger name="org.apache.rocketmq.store" level="WARN" additivity="false">
+ <logger name="RocketmqStore" level="WARN" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services