vongosling closed pull request #143: [ROCKETMQ-256] Clean commit log files 
manually to specified disk usage watermark
URL: https://github.com/apache/rocketmq/pull/143
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 57565a640..27e8d9eab 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -49,6 +49,63 @@ public ConsumerOffsetManager(BrokerController 
brokerController) {
         this.brokerController = brokerController;
     }
 
+    public long computeConsumedPhysicalOffset() {
+
+        Map<String/* Topic */, Map<Integer/* Queue ID */, Long /* Minimum 
consumed physical offset */>> offsets = new HashMap<>();
+
+        Map<String/* Bar topic */, Map<Integer/* Queue ID */, String /* Bar 
consumer group */>> barTopicQueueConsumerGroup = new HashMap<>();
+
+        for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, Long>> next : 
offsetTable.entrySet()) {
+            String[] segments = next.getKey().split("@");
+            if (!offsets.containsKey(segments[0])) {
+                offsets.put(segments[0], next.getValue());
+                barTopicQueueConsumerGroup.put(segments[0], new 
HashMap<Integer, String>());
+                for (Map.Entry<Integer, Long> entry : 
next.getValue().entrySet()) {
+                    
barTopicQueueConsumerGroup.get(segments[0]).put(entry.getKey(), segments[1]);
+                }
+            } else {
+                Map<Integer, Long> queueOffsets = offsets.get(segments[0]);
+
+                ConcurrentMap<Integer, Long> otherOffsets = next.getValue();
+                for (Map.Entry<Integer, Long> row : otherOffsets.entrySet()) {
+                    if (queueOffsets.get(row.getKey()) > row.getValue()) {
+                        queueOffsets.put(row.getKey(), row.getValue());
+                        
barTopicQueueConsumerGroup.get(segments[0]).put(row.getKey(), segments[1]);
+                    }
+                }
+            }
+        }
+
+        long minOffset = Long.MAX_VALUE;
+
+        // The slowest topic and consumer group in terms of commit log.
+        String barTopic = null;
+        String barConsumerGroup = null;
+
+        for (Map.Entry<String, Map<Integer, Long>> next : offsets.entrySet()) {
+            String topic = next.getKey();
+            for (Map.Entry<Integer, Long> it : next.getValue().entrySet()) {
+                int queueId = it.getKey();
+                Long consumeOffset = it.getValue();
+                Long maxOffsetInQueue = 
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+                if (consumeOffset < maxOffsetInQueue) {
+                    long physicalOffset = 
brokerController.getMessageStore().getCommitLogOffsetInQueue(topic, queueId, 
consumeOffset);
+                    if (physicalOffset < minOffset) {
+                        minOffset = physicalOffset;
+                        barTopic = topic;
+                        barConsumerGroup = 
barTopicQueueConsumerGroup.get(topic).get(queueId);
+                    }
+                } else {
+                    log.debug("Messages in Topic: {} and queue ID: {} are all 
consumed in time", topic, queueId);
+                }
+            }
+        }
+
+        log.info("Bar topic@ConsumerGroup is {}", barTopic + "@" + 
barConsumerGroup);
+
+        return minOffset;
+    }
+
     public void scanUnsubscribedTopic() {
         Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = 
this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 690f70bfc..8ad96751d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -21,6 +21,7 @@
 import java.util.LinkedList;
 import java.util.Set;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.store.CommitLogDispatcher;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.GetMessageResult;
@@ -246,4 +247,14 @@ public void setConfirmOffset(long phyOffset) {
     public ConsumeQueue getConsumeQueue(String topic, int queueId) {
         return next.getConsumeQueue(topic, queueId);
     }
+
+    @Override
+    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+        return null;
+    }
+
+    @Override
+    public void purge(int watermark, long consumedPhysicalOffset, boolean 
force) {
+
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 71fdda931..ddbd944e4 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -66,6 +66,7 @@
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CleanCommitLogRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
@@ -199,6 +200,8 @@ public RemotingCommand processRequest(ChannelHandlerContext 
ctx, RemotingCommand
                 return fetchAllConsumeStatsInBroker(ctx, request);
             case RequestCode.QUERY_CONSUME_QUEUE:
                 return queryConsumeQueue(ctx, request);
+            case RequestCode.CLEAN_COMMIT_LOG:
+                return cleanCommitLog(ctx, request);
             default:
                 break;
         }
@@ -206,6 +209,20 @@ public RemotingCommand 
processRequest(ChannelHandlerContext ctx, RemotingCommand
         return null;
     }
 
+    private RemotingCommand cleanCommitLog(ChannelHandlerContext ctx, 
RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+
+        final CleanCommitLogRequestHeader requestHeader = 
(CleanCommitLogRequestHeader)
+            
request.decodeCommandCustomHeader(CleanCommitLogRequestHeader.class);
+
+        long consumedPhysicalOffset = 
brokerController.getConsumerOffsetManager().computeConsumedPhysicalOffset();
+        brokerController.getMessageStore().purge(requestHeader.getWatermark(), 
consumedPhysicalOffset, requestHeader.isForce());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark("OK");
+        return response;
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4244bddcf..71d06b46c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,14 +18,14 @@
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -49,12 +49,12 @@
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -80,6 +80,7 @@
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CleanCommitLogRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -2042,7 +2043,8 @@ public QueryConsumeQueueResponseBody 
queryConsumeQueue(final String brokerAddr,
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, 
requestHeader);
 
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 brokerAddr), request, timeoutMillis);
+        RemotingCommand response = this.remotingClient.invokeSync(
+            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
 
         assert response != null;
 
@@ -2067,7 +2069,8 @@ public void checkClientInBroker(final String brokerAddr, 
final String consumerGr
 
         request.setBody(requestBody.encode());
 
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 brokerAddr), request, timeoutMillis);
+        RemotingCommand response = this.remotingClient.invokeSync(
+            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
 
         assert response != null;
 
@@ -2075,4 +2078,22 @@ public void checkClientInBroker(final String brokerAddr, 
final String consumerGr
             throw new MQClientException(response.getCode(), 
response.getRemark());
         }
     }
+
+    public void cleanCommitLog(String brokerAddress, int watermark, boolean 
force, long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+        MQClientException {
+        CleanCommitLogRequestHeader requestHeader = new 
CleanCommitLogRequestHeader();
+        requestHeader.setForce(force);
+        requestHeader.setWatermark(watermark);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_COMMIT_LOG, 
requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(
+            MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddress), request, timeoutMillis);
+
+        assert null != response;
+
+        if (ResponseCode.SUCCESS != response.getCode()) {
+            throw new MQClientException(response.getCode(), 
response.getRemark());
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 6f132f7cd..be85911c6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -166,4 +166,6 @@
     public static final int SEND_BATCH_MESSAGE = 320;
 
     public static final int QUERY_CONSUME_QUEUE = 321;
+
+    public static final int CLEAN_COMMIT_LOG = 330;
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CleanCommitLogRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CleanCommitLogRequestHeader.java
new file mode 100644
index 000000000..acd58387c
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CleanCommitLogRequestHeader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class CleanCommitLogRequestHeader implements CommandCustomHeader {
+
+    private boolean force;
+
+    @CFNotNull
+    private int watermark;
+
+    public boolean isForce() {
+        return force;
+    }
+
+    public void setForce(boolean force) {
+        this.force = force;
+    }
+
+    public int getWatermark() {
+        return watermark;
+    }
+
+    public void setWatermark(int watermark) {
+        this.watermark = watermark;
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java 
b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 265645213..174fe33cd 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -89,6 +89,14 @@ public void testGetDiskPartitionSpaceUsedPercent() {
         
assertThat(UtilAll.getDiskPartitionSpaceUsedPercent(tmpDir)).isNotCloseTo(-1, 
within(0.000001));
     }
 
+
+    @Test
+    public void testGetDiskPartitionSpaceUsedPercent4Home() {
+        String tmpDir = System.getProperty("user.home");
+        double percent = UtilAll.getDiskPartitionSpaceUsedPercent(tmpDir);
+        System.out.println(percent);
+    }
+
     @Test
     public void testIsBlank() {
         assertThat(UtilAll.isBlank("Hello ")).isFalse();
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 0810d0ca3..dae8dbe83 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -134,6 +134,10 @@ public int deleteExpiredFile(//
         return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, 
deleteFilesInterval, intervalForcibly, cleanImmediately);
     }
 
+    public int deleteExpiredFile(final double ratio, final long 
physicalOffset, final boolean force, int deleteFileInterval) {
+        return this.mappedFileQueue.deleteExpiredFileByGoal(ratio, 
physicalOffset, force, deleteFileInterval);
+    }
+
     /**
      * Read CommitLog data, use data replication
      */
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b5bac3f72..2a10c6f28 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -579,20 +579,25 @@ public GetMessageResult getMessage(final String group, 
final String topic, final
     }
 
     /**
-
+     * Return maximum consume offset in this consume queue.
+     * @param topic Topic name
+     * @param queueId Queue ID
+     * @return maximum consume offset.
      */
     public long getMaxOffsetInQueue(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
-            long offset = logic.getMaxOffsetInQueue();
-            return offset;
+            return logic.getMaxOffsetInQueue();
         }
 
         return 0;
     }
 
     /**
-
+     * Return minimum consume offset in this consume queue.
+     * @param topic Topic name to query
+     * @param queueId Queue ID
+     * @return minimum consume offset.
      */
     public long getMinOffsetInQueue(String topic, int queueId) {
         ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
@@ -800,7 +805,7 @@ public boolean appendToCommitLog(long startOffset, byte[] 
data) {
 
     @Override
     public void executeDeleteFilesManually() {
-        this.cleanCommitLogService.excuteDeleteFilesManualy();
+        this.cleanCommitLogService.executeDeleteFilesManually();
     }
 
     @Override
@@ -1392,6 +1397,14 @@ public void run() {
         }, 6, TimeUnit.SECONDS);
     }
 
+    @Override
+    public void purge(int watermark, long consumedPhysicalOffset, boolean 
force) {
+        cleanCommitLogService.setPurgeForcefullyWhenManual(force);
+        cleanCommitLogService.setDiskSpaceManuallyCleanRatio(watermark / 
100.0);
+        
cleanCommitLogService.setConsumedPhysicalOffset(consumedPhysicalOffset);
+        cleanCommitLogService.executeDeleteFilesManually();
+    }
+
     class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
 
         @Override
@@ -1422,18 +1435,24 @@ public void dispatch(DispatchRequest request) {
     class CleanCommitLogService {
 
         private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
+
         private final double diskSpaceWarningLevelRatio =
             
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio",
 "0.90"));
 
         private final double diskSpaceCleanForciblyRatio =
             
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio",
 "0.85"));
+
         private long lastRedeleteTimestamp = 0;
 
         private volatile int manualDeleteFileSeveralTimes = 0;
 
         private volatile boolean cleanImmediately = false;
 
-        public void excuteDeleteFilesManualy() {
+        private double diskSpaceManuallyCleanRatio = 1.0;
+        private long consumedPhysicalOffset;
+        private boolean purgeForcefullyWhenManual;
+
+        public void executeDeleteFilesManually() {
             this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
             DefaultMessageStore.log.info("executeDeleteFilesManually was 
invoked");
         }
@@ -1452,32 +1471,38 @@ private void deleteExpiredFiles() {
             int deleteCount = 0;
             long fileReservedTime = 
DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
             int deletePhysicFilesInterval = 
DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
-            int destroyMapedFileIntervalForcibly = 
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+            int destroyMappedFileIntervalForcibly = 
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
 
-            boolean timeup = this.isTimeToDelete();
-            boolean spacefull = this.isSpaceToDelete();
-            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
+            boolean timeUp = this.isTimeToDelete();
+            boolean diskFull = this.isSpaceToDelete();
 
-            if (timeup || spacefull || manualDelete) {
+            if (manualDeleteFileSeveralTimes > 0) {
+                manualDeleteFileSeveralTimes--;
+                deleteCount = 
commitLog.deleteExpiredFile(diskSpaceManuallyCleanRatio, consumedPhysicalOffset,
+                    purgeForcefullyWhenManual, deletePhysicFilesInterval);
+                if (deleteCount <= 0) {
+                    log.warn("Try to purge commit log manually, but failed to 
delete any commit log file");
+                }
+                return;
+            }
 
-                if (manualDelete)
-                    this.manualDeleteFileSeveralTimes--;
+            if (timeUp || diskFull) {
 
                 boolean cleanAtOnce = 
DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && 
this.cleanImmediately;
 
-                log.info("begin to delete before {} hours file. timeup: {} 
spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
+                log.info("begin to delete before {} hours file. timeUp: {} 
diskFull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
                     fileReservedTime, //
-                    timeup, //
-                    spacefull, //
+                    timeUp, //
+                    diskFull, //
                     manualDeleteFileSeveralTimes, //
                     cleanAtOnce);
 
-                fileReservedTime *= 60 * 60 * 1000;
+                fileReservedTime *= 60L * 60 * 1000;
 
                 deleteCount = 
DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, 
deletePhysicFilesInterval,
-                    destroyMapedFileIntervalForcibly, cleanAtOnce);
+                    destroyMappedFileIntervalForcibly, cleanAtOnce);
                 if (deleteCount > 0) {
-                } else if (spacefull) {
+                } else if (diskFull) {
                     log.warn("disk space will be full soon, but delete file 
failed.");
                 }
             }
@@ -1488,9 +1513,9 @@ private void redeleteHangedFile() {
             long currentTimestamp = System.currentTimeMillis();
             if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
                 this.lastRedeleteTimestamp = currentTimestamp;
-                int destroyMapedFileIntervalForcibly =
+                int destroyMappedFileIntervalForcibly =
                     
DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
-                if 
(DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly))
 {
+                if 
(DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMappedFileIntervalForcibly))
 {
                 }
             }
         }
@@ -1518,8 +1543,8 @@ private boolean isSpaceToDelete() {
                 String storePathPhysic = 
DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
                 double physicRatio = 
UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                 if (physicRatio > diskSpaceWarningLevelRatio) {
-                    boolean diskok = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
-                    if (diskok) {
+                    boolean diskOK = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+                    if (diskOK) {
                         DefaultMessageStore.log.error("physic disk maybe full 
soon " + physicRatio + ", so mark disk full");
                     }
 
@@ -1527,8 +1552,8 @@ private boolean isSpaceToDelete() {
                 } else if (physicRatio > diskSpaceCleanForciblyRatio) {
                     cleanImmediately = true;
                 } else {
-                    boolean diskok = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
-                    if (!diskok) {
+                    boolean diskOK = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+                    if (!diskOK) {
                         DefaultMessageStore.log.info("physic disk space OK " + 
physicRatio + ", so mark disk ok");
                     }
                 }
@@ -1544,23 +1569,23 @@ private boolean isSpaceToDelete() {
                     
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
                 double logicsRatio = 
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
                 if (logicsRatio > diskSpaceWarningLevelRatio) {
-                    boolean diskok = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
-                    if (diskok) {
-                        DefaultMessageStore.log.error("logics disk maybe full 
soon " + logicsRatio + ", so mark disk full");
+                    boolean diskOK = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+                    if (diskOK) {
+                        DefaultMessageStore.log.error("Consume queue disk 
maybe full soon " + logicsRatio + ", so mark disk full");
                     }
 
                     cleanImmediately = true;
                 } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
                     cleanImmediately = true;
                 } else {
-                    boolean diskok = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
-                    if (!diskok) {
-                        DefaultMessageStore.log.info("logics disk space OK " + 
logicsRatio + ", so mark disk ok");
+                    boolean diskOK = 
DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+                    if (!diskOK) {
+                        DefaultMessageStore.log.info("Consume queue disk space 
OK " + logicsRatio + ", so mark disk ok");
                     }
                 }
 
                 if (logicsRatio < 0 || logicsRatio > ratio) {
-                    DefaultMessageStore.log.info("logics disk maybe full soon, 
so reclaim space, " + logicsRatio);
+                    DefaultMessageStore.log.info("Consume queue disk maybe 
full soon, so reclaim space, " + logicsRatio);
                     return true;
                 }
             }
@@ -1575,6 +1600,18 @@ public int getManualDeleteFileSeveralTimes() {
         public void setManualDeleteFileSeveralTimes(int 
manualDeleteFileSeveralTimes) {
             this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
         }
+
+        public void setDiskSpaceManuallyCleanRatio(double 
diskSpaceManuallyCleanRatio) {
+            this.diskSpaceManuallyCleanRatio = diskSpaceManuallyCleanRatio;
+        }
+
+        public void setConsumedPhysicalOffset(long consumedPhysicalOffset) {
+            this.consumedPhysicalOffset = consumedPhysicalOffset;
+        }
+
+        public void setPurgeForcefullyWhenManual(boolean 
purgeForcefullyWhenManual) {
+            this.purgeForcefullyWhenManual = purgeForcefullyWhenManual;
+        }
     }
 
     class CleanConsumeQueueService {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a8fa36485..f3ffc19a3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -339,34 +339,33 @@ public int deleteExpiredFileByTime(final long expiredTime,
         final boolean cleanImmediately) {
         Object[] mfs = this.copyMappedFiles(0);
 
-        if (null == mfs)
+        if (null == mfs) {
             return 0;
+        }
 
         int mfsLength = mfs.length - 1;
         int deleteCount = 0;
         List<MappedFile> files = new ArrayList<MappedFile>();
-        if (null != mfs) {
-            for (int i = 0; i < mfsLength; i++) {
-                MappedFile mappedFile = (MappedFile) mfs[i];
-                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() 
+ expiredTime;
-                if (System.currentTimeMillis() >= liveMaxTimestamp || 
cleanImmediately) {
-                    if (mappedFile.destroy(intervalForcibly)) {
-                        files.add(mappedFile);
-                        deleteCount++;
-
-                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
-                            break;
-                        }
+        for (int i = 0; i < mfsLength; i++) {
+            MappedFile mappedFile = (MappedFile) mfs[i];
+            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + 
expiredTime;
+            if (System.currentTimeMillis() >= liveMaxTimestamp || 
cleanImmediately) {
+                if (mappedFile.destroy(intervalForcibly)) {
+                    files.add(mappedFile);
+                    deleteCount++;
 
-                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
-                            try {
-                                Thread.sleep(deleteFilesInterval);
-                            } catch (InterruptedException e) {
-                            }
-                        }
-                    } else {
+                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                         break;
                     }
+
+                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
+                        try {
+                            Thread.sleep(deleteFilesInterval);
+                        } catch (InterruptedException ignore) {
+                        }
+                    }
+                } else {
+                    break;
                 }
             }
         }
@@ -376,6 +375,54 @@ public int deleteExpiredFileByTime(final long expiredTime,
         return deleteCount;
     }
 
+    public int deleteExpiredFileByGoal(final double ratio, final long 
consumedPhysicalOffset, final boolean force,
+        int deleteFileInterval) {
+        Object[] mfs = this.copyMappedFiles(0);
+
+        if (null == mfs) {
+            return 0;
+        }
+
+        File storeFile = new File(storePath);
+        if (!storeFile.exists()) {
+            return 0;
+        }
+
+        long total = storeFile.getTotalSpace();
+        long used = storeFile.getFreeSpace();
+
+        int mfsLength = mfs.length - 1;
+        int deleteCount = 0;
+        List<MappedFile> files = new ArrayList<MappedFile>();
+        for (int i = 0; i < mfsLength; i++) {
+            MappedFile mappedFile = (MappedFile) mfs[i];
+            if (mappedFile.getFileFromOffset() + mappedFileSize < 
consumedPhysicalOffset || force) {
+                used -= mappedFileSize;
+                if (used <= total * ratio) {
+                    break;
+                }
+
+                if (mappedFile.destroy(1000 * 60)) {
+                    files.add(mappedFile);
+                    deleteCount++;
+
+                    if (deleteFileInterval > 0) {
+                        try {
+                            Thread.sleep(deleteFileInterval);
+                        } catch (InterruptedException ignore) {
+                        }
+                    }
+                } else {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        deleteExpiredFile(files);
+        return deleteCount;
+    }
+
     public int deleteExpiredFileByOffset(long offset, int unitSize) {
         Object[] mfs = this.copyMappedFiles(0);
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 55572ce10..347b9d329 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -322,4 +322,12 @@ QueryMessageResult queryMessage(final String topic, final 
String key, final int
      * @return Consume queue.
      */
     ConsumeQueue getConsumeQueue(String topic, int queueId);
+
+    /**
+     * Purge data.
+     * @param watermark Desirable watermark.
+     * @param consumedPhysicalOffset Known position that has been consumed.
+     * @param force Purge forcefully or not.
+     */
+    void purge(int watermark, long consumedPhysicalOffset, boolean force);
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 409ea3322..685d6b114 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -478,4 +478,11 @@ public QueryConsumeQueueResponseBody 
queryConsumeQueue(String brokerAddr, String
             brokerAddr, topic, queueId, index, count, consumerGroup
         );
     }
+
+    @Override
+    public void cleanCommitLog(String brokerAddress, int watermark, boolean 
force)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingConnectException, MQClientException,
+        RemotingSendRequestException {
+        this.defaultMQAdminExtImpl.cleanCommitLog(brokerAddress, watermark, 
force);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 157ae21eb..946315d5a 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -963,4 +963,11 @@ public QueryConsumeQueueResponseBody 
queryConsumeQueue(String brokerAddr, String
             brokerAddr, topic, queueId, index, count, consumerGroup, 
timeoutMillis
         );
     }
+
+    @Override
+    public void cleanCommitLog(String brokerAddress, int watermark, boolean 
force)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingConnectException, MQClientException,
+        RemotingSendRequestException {
+        
this.mqClientInstance.getMQClientAPIImpl().cleanCommitLog(brokerAddress, 
watermark, force, timeoutMillis);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 82add92b2..4787d516d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -263,4 +263,8 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final 
String brokerAddr,
                                             final String topic, final int 
queueId,
                                             final long index, final int count, 
final String consumerGroup)
         throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+    void cleanCommitLog(final String brokerAddress, int watermark, boolean 
force)
+        throws InterruptedException, RemotingTimeoutException, 
RemotingConnectException, MQClientException,
+        RemotingSendRequestException;
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 6398291a0..0b4e20d91 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -31,6 +31,7 @@
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
 import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
+import org.apache.rocketmq.tools.command.broker.CleanCommitLogSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
 import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
@@ -178,6 +179,7 @@ public static void initCommand() {
         initCommand(new UpdateOrderConfCommand());
         initCommand(new CleanExpiredCQSubCommand());
         initCommand(new CleanUnusedTopicCommand());
+        initCommand(new CleanCommitLogSubCommand());
 
         initCommand(new StartMonitoringSubCommand());
         initCommand(new StatsAllSubCommand());
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanCommitLogSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanCommitLogSubCommand.java
new file mode 100644
index 000000000..da7814ead
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanCommitLogSubCommand.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.broker;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class CleanCommitLogSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "cleanCommitLog";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Clean commit log files";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option clusterOption = new Option("c", "clusterName", true, "Cluster 
name");
+        options.addOption(clusterOption);
+
+        Option brokerOption = new Option("b", "brokerAddress", true, "Broker 
address");
+        options.addOption(brokerOption);
+
+        Option watermarkOption = new Option("w", "watermark", true, "Watermark 
level in percent");
+        options.addOption(watermarkOption);
+
+        Option forceOption = new Option("f", "force", false, "Force to clean 
commit log to meet watermark or not. Warning: you are at risk of losing message 
if this option is specified");
+        options.addOption(forceOption);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            defaultMQAdminExt.start();
+
+            String brokerAddress = commandLine.getOptionValue("b");
+            String clusterName = commandLine.getOptionValue("c");
+            int watermark = commandLine.hasOption("w") ? 
Integer.parseInt(commandLine.getOptionValue("w")) : 100;
+            boolean force = commandLine.hasOption("f");
+
+            if (null != brokerAddress && !brokerAddress.isEmpty()) {
+                try {
+                    defaultMQAdminExt.cleanCommitLog(brokerAddress, watermark, 
force);
+                } catch (InterruptedException | RemotingTimeoutException | 
RemotingConnectException | RemotingSendRequestException e) {
+                    throw new SubCommandException("Failed to clean commit 
log", e);
+                }
+            } else if (null != clusterName && !clusterName.isEmpty()) {
+                List<String> failedList = new ArrayList<>();
+                try {
+                    Set<String> masterSet = 
CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, 
clusterName);
+                    for (String address : masterSet) {
+                        try {
+                            defaultMQAdminExt.cleanCommitLog(address, 
watermark, force);
+                        } catch (Exception e) {
+                            failedList.add(address);
+                        }
+                    }
+                } catch (InterruptedException | RemotingConnectException | 
RemotingTimeoutException | RemotingSendRequestException | MQBrokerException e) {
+                    throw new SubCommandException("Failed to figure out 
addresses of brokers by cluster name", e);
+                }
+
+                if (!failedList.isEmpty()) {
+                    throw new SubCommandException("Failed to clean commit log 
files for some brokers: " + failedList.toString());
+                }
+            } else {
+                // print help
+                throw new SubCommandException("Either brokerAddress or 
clusterName is required");
+            }
+
+        } catch (MQClientException e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to