This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bee5077bcb [ISSUE #6336] [RIP-62] Cold Read Control  (#6507)
bee5077bcb is described below

commit bee5077bcb77411f103aafb2220184f59db2c95e
Author: Drizzle <[email protected]>
AuthorDate: Mon Jun 5 17:09:31 2023 +0800

    [ISSUE #6336] [RIP-62] Cold Read Control  (#6507)
---
 .../apache/rocketmq/broker/BrokerController.java   |  38 ++++
 .../rocketmq/broker/coldctr/ColdCtrStrategy.java   |  42 ++++
 .../broker/coldctr/ColdDataCgCtrService.java       | 250 +++++++++++++++++++++
 .../coldctr/ColdDataPullRequestHoldService.java    | 105 +++++++++
 .../broker/coldctr/PIDAdaptiveColdCtrStrategy.java |  85 +++++++
 .../broker/coldctr/SimpleColdCtrStrategy.java      |  51 +++++
 .../broker/processor/AdminBrokerProcessor.java     | 135 +++++++++++
 .../broker/processor/PullMessageProcessor.java     |  38 +++-
 broker/src/main/resources/rmq.broker.logback.xml   |  33 +++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  77 +++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  36 +++
 .../java/org/apache/rocketmq/common/MixAll.java    |  17 ++
 .../rocketmq/common/coldctr/AccAndTimeStamp.java   |  63 ++++++
 .../common/constant/FIleReadaheadMode.java         |  21 ++
 .../rocketmq/common/constant/LoggerName.java       |   1 +
 .../rocketmq/remoting/protocol/RequestCode.java    |   5 +
 .../java/org/apache/rocketmq/store/CommitLog.java  | 239 +++++++++++++++++++-
 .../apache/rocketmq/store/DefaultMessageStore.java |  14 ++
 .../apache/rocketmq/store/GetMessageResult.java    |  10 +
 .../rocketmq/store/SelectMappedBufferResult.java   |  10 +
 .../rocketmq/store/config/MessageStoreConfig.java  |  54 +++++
 .../java/org/apache/rocketmq/store/util/LibC.java  |   6 +
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  28 +++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  27 +++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  12 +
 .../rocketmq/tools/command/MQAdminStartup.java     |   9 +
 .../broker/CommitLogSetReadAheadSubCommand.java    | 106 +++++++++
 .../broker/GetColdDataFlowCtrInfoSubCommand.java   | 124 ++++++++++
 ...RemoveColdDataFlowCtrGroupConfigSubCommand.java |  93 ++++++++
 ...UpdateColdDataFlowCtrGroupConfigSubCommand.java | 103 +++++++++
 30 files changed, 1827 insertions(+), 5 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8560bde9fa..73da996ae8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -51,6 +51,8 @@ import 
org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.coldctr.ColdDataCgCtrService;
+import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
 import org.apache.rocketmq.broker.controller.ReplicasManager;
 import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -265,6 +267,8 @@ public class BrokerController {
     protected ReplicasManager replicasManager;
     private long lastSyncTimeMs = System.currentTimeMillis();
     private BrokerMetricsManager brokerMetricsManager;
+    private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
+    private ColdDataCgCtrService coldDataCgCtrService;
 
     public BrokerController(
         final BrokerConfig brokerConfig,
@@ -321,6 +325,8 @@ public class BrokerController {
         this.broker2Client = new Broker2Client(this);
         this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new 
LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
         this.scheduleMessageService = new ScheduleMessageService(this);
+        this.coldDataPullRequestHoldService = new 
ColdDataPullRequestHoldService(this);
+        this.coldDataCgCtrService = new ColdDataCgCtrService(this);
 
         if (nettyClientConfig != null) {
             this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
@@ -1403,6 +1409,14 @@ public class BrokerController {
             this.brokerPreOnlineService.shutdown();
         }
 
+        if (this.coldDataPullRequestHoldService != null) {
+            this.coldDataPullRequestHoldService.shutdown();
+        }
+
+        if (this.coldDataCgCtrService != null) {
+            this.coldDataCgCtrService.shutdown();
+        }
+
         
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
         shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
 
@@ -1547,6 +1561,13 @@ public class BrokerController {
             this.brokerPreOnlineService.start();
         }
 
+        if (this.coldDataPullRequestHoldService != null) {
+            this.coldDataPullRequestHoldService.start();
+        }
+
+        if (this.coldDataCgCtrService != null) {
+            this.coldDataCgCtrService.start();
+        }
     }
 
     public void start() throws Exception {
@@ -2297,4 +2318,21 @@ public class BrokerController {
     public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
         return adminBrokerThreadPoolQueue;
     }
+
+    public ColdDataPullRequestHoldService getColdDataPullRequestHoldService() {
+        return coldDataPullRequestHoldService;
+    }
+
+    public void setColdDataPullRequestHoldService(
+        ColdDataPullRequestHoldService coldDataPullRequestHoldService) {
+        this.coldDataPullRequestHoldService = coldDataPullRequestHoldService;
+    }
+
+    public ColdDataCgCtrService getColdDataCgCtrService() {
+        return coldDataCgCtrService;
+    }
+
+    public void setColdDataCgCtrService(ColdDataCgCtrService 
coldDataCgCtrService) {
+        this.coldDataCgCtrService = coldDataCgCtrService;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java
new file mode 100644
index 0000000000..11fa0e707f
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public interface ColdCtrStrategy {
+    /**
+     * Calculate the determining factor about whether to accelerate or 
decelerate
+     * @return
+     */
+    Double decisionFactor();
+    /**
+     * Promote the speed for consumerGroup to read cold data
+     * @param consumerGroup
+     * @param currentThreshold
+     */
+    void promote(String consumerGroup, Long currentThreshold);
+    /**
+     * Decelerate the speed for consumerGroup to read cold data
+     * @param consumerGroup
+     * @param currentThreshold
+     */
+    void decelerate(String consumerGroup, Long currentThreshold);
+    /**
+     * Collect the total number of cold read data in the system
+     * @param globalAcc
+     */
+    void collect(Long globalAcc);
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
new file mode 100644
index 0000000000..dd9278fb75
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.coldctr.AccAndTimeStamp;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+/**
+ * store the cg cold read ctr table and acc the size of the cold
+ * reading msg, timing to clear the table and set acc to zero
+ */
+public class ColdDataCgCtrService extends ServiceThread {
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_COLDCTR_LOGGER_NAME);
+    private final SystemClock systemClock = new SystemClock();
+    private final long cgColdAccResideTimeoutMills = 60 * 1000;
+    private static final AtomicLong GLOBAL_ACC = new AtomicLong(0L);
+    private static final String ADAPTIVE = "||adaptive";
+    /**
+     * as soon as the consumerGroup read the cold data then it will be put 
into @code cgColdThresholdMapRuntime,
+     * and it also will be removed when does not read cold data in @code 
cgColdAccResideTimeoutMills later;
+     */
+    private final ConcurrentHashMap<String, AccAndTimeStamp> 
cgColdThresholdMapRuntime = new ConcurrentHashMap<>();
+    /**
+     * if the system admin wants to set the special cold read threshold for 
some consumerGroup, the configuration will
+     * be putted into @code cgColdThresholdMapConfig
+     */
+    private final ConcurrentHashMap<String, Long> cgColdThresholdMapConfig = 
new ConcurrentHashMap<>();
+    private final BrokerConfig brokerConfig;
+    private final MessageStoreConfig messageStoreConfig;
+    private final ColdCtrStrategy coldCtrStrategy;
+
+    public ColdDataCgCtrService(BrokerController brokerController) {
+        this.brokerConfig = brokerController.getBrokerConfig();
+        this.messageStoreConfig = brokerController.getMessageStoreConfig();
+        this.coldCtrStrategy = brokerConfig.isUsePIDColdCtrStrategy() ? new 
PIDAdaptiveColdCtrStrategy(this, 
(long)(brokerConfig.getGlobalColdReadThreshold() * 0.8)) : new 
SimpleColdCtrStrategy(this);
+    }
+
+    @Override
+    public String getServiceName() {
+        return ColdDataCgCtrService.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        log.info("{} service started", this.getServiceName());
+        while (!this.isStopped()) {
+            try {
+                if (messageStoreConfig.isColdDataFlowControlEnable()) {
+                    this.waitForRunning(5 * 1000);
+                } else {
+                    this.waitForRunning(180 * 1000);
+                }
+                long beginLockTimestamp = this.systemClock.now();
+                clearDataAcc();
+                if (!brokerConfig.isColdCtrStrategyEnable()) {
+                    clearAdaptiveConfig();
+                }
+                long costTime = this.systemClock.now() - beginLockTimestamp;
+                log.info("[{}] clearTheDataAcc-cost {} ms.", costTime > 3 * 
1000 ? "NOTIFYME" : "OK", costTime);
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception", e);
+            }
+        }
+        log.info("{} service end", this.getServiceName());
+    }
+
+    public String getColdDataFlowCtrInfo() {
+        JSONObject result = new JSONObject();
+        result.put("runtimeTable", this.cgColdThresholdMapRuntime);
+        result.put("configTable", this.cgColdThresholdMapConfig);
+        result.put("cgColdReadThreshold", 
this.brokerConfig.getCgColdReadThreshold());
+        result.put("globalColdReadThreshold", 
this.brokerConfig.getGlobalColdReadThreshold());
+        result.put("globalAcc", GLOBAL_ACC.get());
+        return result.toJSONString();
+    }
+
+    /**
+     * clear the long time no cold read cg in the table;
+     * update the acc to zero for the cg in the table;
+     * use the strategy to promote or decelerate the cg;
+     */
+    private void clearDataAcc() {
+        log.info("clearDataAcc cgColdThresholdMapRuntime key size: {}", 
cgColdThresholdMapRuntime.size());
+        if (brokerConfig.isColdCtrStrategyEnable()) {
+            coldCtrStrategy.collect(GLOBAL_ACC.get());
+        }
+        Iterator<Entry<String, AccAndTimeStamp>> iterator = 
cgColdThresholdMapRuntime.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Entry<String, AccAndTimeStamp> next = iterator.next();
+            if (System.currentTimeMillis() >= cgColdAccResideTimeoutMills + 
next.getValue().getLastColdReadTimeMills()) {
+                if (brokerConfig.isColdCtrStrategyEnable()) {
+                    
cgColdThresholdMapConfig.remove(buildAdaptiveKey(next.getKey()));
+                }
+                iterator.remove();
+            } else if (next.getValue().getColdAcc().get() >= 
getThresholdByConsumerGroup(next.getKey())) {
+                log.info("Coldctr consumerGroup: {}, acc: {}, threshold: {}", 
next.getKey(), next.getValue().getColdAcc().get(), 
getThresholdByConsumerGroup(next.getKey()));
+                if (brokerConfig.isColdCtrStrategyEnable() && 
!isGlobalColdCtr() && !isAdminConfig(next.getKey())) {
+                    coldCtrStrategy.promote(buildAdaptiveKey(next.getKey()), 
getThresholdByConsumerGroup(next.getKey()));
+                }
+            }
+            next.getValue().getColdAcc().set(0L);
+        }
+        if (isGlobalColdCtr()) {
+            log.info("Coldctr global acc: {}, threshold: {}", 
GLOBAL_ACC.get(), this.brokerConfig.getGlobalColdReadThreshold());
+        }
+        if (brokerConfig.isColdCtrStrategyEnable()) {
+            sortAndDecelerate();
+        }
+        GLOBAL_ACC.set(0L);
+    }
+
+    private void sortAndDecelerate() {
+        List<Entry<String, Long>> configMapList = new ArrayList<Entry<String, 
Long>>(cgColdThresholdMapConfig.entrySet());
+        configMapList.sort(new Comparator<Entry<String, Long>>() {
+            @Override
+            public int compare(Entry<String, Long> o1, Entry<String, Long> o2) 
{
+                return (int)(o2.getValue() - o1.getValue());
+            }
+        });
+        Iterator<Entry<String, Long>> iterator = configMapList.iterator();
+        int maxDecelerate = 3;
+        while (iterator.hasNext() && maxDecelerate > 0) {
+            Entry<String, Long> next = iterator.next();
+            if (!isAdminConfig(next.getKey())) {
+                coldCtrStrategy.decelerate(next.getKey(), 
getThresholdByConsumerGroup(next.getKey()));
+                maxDecelerate --;
+            }
+        }
+    }
+
+    public void coldAcc(String consumerGroup, long coldDataToAcc) {
+        if (coldDataToAcc <= 0) {
+            return;
+        }
+        GLOBAL_ACC.addAndGet(coldDataToAcc);
+        AccAndTimeStamp atomicAcc = 
cgColdThresholdMapRuntime.get(consumerGroup);
+        if (null == atomicAcc) {
+            atomicAcc = new AccAndTimeStamp(new AtomicLong(coldDataToAcc));
+            atomicAcc = cgColdThresholdMapRuntime.putIfAbsent(consumerGroup, 
atomicAcc);
+        }
+        if (null != atomicAcc) {
+            atomicAcc.getColdAcc().addAndGet(coldDataToAcc);
+            atomicAcc.setLastColdReadTimeMills(System.currentTimeMillis());
+        }
+    }
+
+    public void addOrUpdateGroupConfig(String consumerGroup, Long threshold) {
+        cgColdThresholdMapConfig.put(consumerGroup, threshold);
+    }
+
+    public void removeGroupConfig(String consumerGroup) {
+        cgColdThresholdMapConfig.remove(consumerGroup);
+    }
+
+    public boolean isCgNeedColdDataFlowCtr(String consumerGroup) {
+        if (!this.messageStoreConfig.isColdDataFlowControlEnable()) {
+            return false;
+        }
+        if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) {
+            return false;
+        }
+        AccAndTimeStamp accAndTimeStamp = 
cgColdThresholdMapRuntime.get(consumerGroup);
+        if (null == accAndTimeStamp) {
+            return false;
+        }
+
+        Long threshold = getThresholdByConsumerGroup(consumerGroup);
+        if (accAndTimeStamp.getColdAcc().get() >= threshold) {
+            return true;
+        }
+        return GLOBAL_ACC.get() >= 
this.brokerConfig.getGlobalColdReadThreshold();
+    }
+
+    public boolean isGlobalColdCtr() {
+        return GLOBAL_ACC.get() > 
this.brokerConfig.getGlobalColdReadThreshold();
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+    private Long getThresholdByConsumerGroup(String consumerGroup) {
+        if (isAdminConfig(consumerGroup)) {
+            if (consumerGroup.endsWith(ADAPTIVE)) {
+                return 
cgColdThresholdMapConfig.get(consumerGroup.split(ADAPTIVE)[0]);
+            }
+            return cgColdThresholdMapConfig.get(consumerGroup);
+        }
+        Long threshold = null;
+        if (brokerConfig.isColdCtrStrategyEnable()) {
+            if (consumerGroup.endsWith(ADAPTIVE)) {
+                threshold = cgColdThresholdMapConfig.get(consumerGroup);
+            } else {
+                threshold = 
cgColdThresholdMapConfig.get(buildAdaptiveKey(consumerGroup));
+            }
+        }
+        if (null == threshold) {
+            threshold = this.brokerConfig.getCgColdReadThreshold();
+        }
+        return threshold;
+    }
+
+    private String buildAdaptiveKey(String consumerGroup) {
+        return consumerGroup + ADAPTIVE;
+    }
+
+    private boolean isAdminConfig(String consumerGroup) {
+        if (consumerGroup.endsWith(ADAPTIVE)) {
+            consumerGroup = consumerGroup.split(ADAPTIVE)[0];
+        }
+        return cgColdThresholdMapConfig.containsKey(consumerGroup);
+    }
+
+    private void clearAdaptiveConfig() {
+        cgColdThresholdMapConfig.entrySet().removeIf(next -> 
next.getKey().endsWith(ADAPTIVE));
+    }
+
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
new file mode 100644
index 0000000000..c38d886fd3
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+/**
+ * just requests are type of pull have the qualification to be put into this 
hold queue.
+ * if the pull request is reading cold data and that request will be cold at 
the first time,
+ * then the pull request will be cold in this @code 
pullRequestLinkedBlockingQueue,
+ * in @code coldTimeoutMillis later the pull request will be warm and marked 
holded
+ */
+public class ColdDataPullRequestHoldService extends ServiceThread {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_COLDCTR_LOGGER_NAME);
+    public static final String NO_SUSPEND_KEY = "_noSuspend_";
+
+    private final long coldHoldTimeoutMillis = 3000;
+    private final SystemClock systemClock = new SystemClock();
+    private final BrokerController brokerController;
+    private final LinkedBlockingQueue<PullRequest> pullRequestColdHoldQueue = 
new LinkedBlockingQueue<>(10000);
+
+    public void suspendColdDataReadRequest(PullRequest pullRequest) {
+        if 
(this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+            pullRequestColdHoldQueue.offer(pullRequest);
+        }
+    }
+
+    public ColdDataPullRequestHoldService(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public String getServiceName() {
+        return ColdDataPullRequestHoldService.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        log.info("{} service started", this.getServiceName());
+        while (!this.isStopped()) {
+            try {
+                if 
(!this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                    this.waitForRunning(20 * 1000);
+                } else {
+                    this.waitForRunning(5 * 1000);
+                }
+                long beginClockTimestamp = this.systemClock.now();
+                this.checkColdDataPullRequest();
+                long costTime = this.systemClock.now() - beginClockTimestamp;
+                log.info("[{}] checkColdDataPullRequest-cost {} ms.", costTime 
> 5 * 1000 ? "NOTIFYME" : "OK", costTime);
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception", e);
+            }
+        }
+        log.info("{} service end", this.getServiceName());
+    }
+
+    private void checkColdDataPullRequest() {
+        int succTotal = 0, errorTotal = 0, queueSize = 
pullRequestColdHoldQueue.size() ;
+        Iterator<PullRequest> iterator = pullRequestColdHoldQueue.iterator();
+        while (iterator.hasNext()) {
+            PullRequest pullRequest = iterator.next();
+            if (System.currentTimeMillis() >= 
pullRequest.getSuspendTimestamp() + coldHoldTimeoutMillis) {
+                try {
+                    
pullRequest.getRequestCommand().addExtField(NO_SUSPEND_KEY, "1");
+                    
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
+                        pullRequest.getClientChannel(), 
pullRequest.getRequestCommand());
+                    succTotal++;
+                } catch (Exception e) {
+                    log.error("PullRequestColdHoldService 
checkColdDataPullRequest error", e);
+                    errorTotal++;
+                }
+                //remove the timeout request from the iterator
+                iterator.remove();
+            }
+        }
+        log.info("checkColdPullRequest-info-finish, queueSize: {} 
successTotal: {} errorTotal: {}",
+            queueSize, succTotal, errorTotal);
+    }
+
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
new file mode 100644
index 0000000000..87d9789f71
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class PIDAdaptiveColdCtrStrategy implements ColdCtrStrategy {
+    /**
+     * Stores the maximum number of recent et val
+     */
+    private static final int MAX_STORE_NUMS = 10;
+    /**
+     * The weights of the three modules of the PID formula
+     */
+    private static final Double KP = 0.5, KI = 0.3, KD = 0.2;
+    private final List<Long> historyEtValList = new ArrayList<>();
+    private final ColdDataCgCtrService coldDataCgCtrService;
+    private final Long expectGlobalVal;
+    private long et = 0L;
+
+    public PIDAdaptiveColdCtrStrategy(ColdDataCgCtrService 
coldDataCgCtrService, Long expectGlobalVal) {
+        this.coldDataCgCtrService = coldDataCgCtrService;
+        this.expectGlobalVal = expectGlobalVal;
+    }
+
+    @Override
+    public Double decisionFactor() {
+        if (historyEtValList.size() < MAX_STORE_NUMS) {
+            return 0.0;
+        }
+        Long et1 = historyEtValList.get(historyEtValList.size() - 1);
+        Long et2 = historyEtValList.get(historyEtValList.size() - 2);
+        Long differential = et1 - et2;
+        Double integration = 0.0;
+        for (Long item: historyEtValList) {
+            integration += item;
+        }
+        return  KP * et + KI * integration + KD * differential;
+    }
+
+    @Override
+    public void promote(String consumerGroup, Long currentThreshold) {
+        if (decisionFactor() > 0) {
+            coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup, 
(long)(currentThreshold * 1.5));
+        }
+    }
+
+    @Override
+    public void decelerate(String consumerGroup, Long currentThreshold) {
+        if (decisionFactor() < 0) {
+            long changedThresholdVal = (long)(currentThreshold * 0.8);
+            if (changedThresholdVal < 
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold()) {
+                changedThresholdVal = 
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold();
+            }
+            coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup, 
changedThresholdVal);
+        }
+    }
+
+    @Override
+    public void collect(Long globalAcc) {
+        et = expectGlobalVal - globalAcc;
+        historyEtValList.add(et);
+        Iterator<Long> iterator = historyEtValList.iterator();
+        while (historyEtValList.size() > MAX_STORE_NUMS && iterator.hasNext()) 
{
+            iterator.next();
+            iterator.remove();
+        }
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
new file mode 100644
index 0000000000..f26a242f9a
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public class SimpleColdCtrStrategy implements ColdCtrStrategy {
+    private final ColdDataCgCtrService coldDataCgCtrService;
+
+    public SimpleColdCtrStrategy(ColdDataCgCtrService coldDataCgCtrService) {
+        this.coldDataCgCtrService = coldDataCgCtrService;
+    }
+
+    @Override
+    public Double decisionFactor() {
+        return null;
+    }
+
+    @Override
+    public void promote(String consumerGroup, Long currentThreshold) {
+        coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup, 
(long)(currentThreshold * 1.5));
+    }
+
+    @Override
+    public void decelerate(String consumerGroup, Long currentThreshold) {
+        if (!coldDataCgCtrService.isGlobalColdCtr()) {
+            return;
+        }
+        long changedThresholdVal = (long)(currentThreshold * 0.8);
+        if (changedThresholdVal < 
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold()) {
+            changedThresholdVal = 
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold();
+        }
+        coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup, 
changedThresholdVal);
+    }
+
+    @Override
+    public void collect(Long globalAcc) {
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0a05239e7f..892a713308 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.UnlockCallback;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -186,6 +187,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.util.LibC;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
@@ -215,6 +217,14 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                 return this.updateBrokerConfig(ctx, request);
             case RequestCode.GET_BROKER_CONFIG:
                 return this.getBrokerConfig(ctx, request);
+            case RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG:
+                return this.updateColdDataFlowCtrGroupConfig(ctx, request);
+            case RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG:
+                return this.removeColdDataFlowCtrGroupConfig(ctx, request);
+            case RequestCode.GET_COLD_DATA_FLOW_CTR_INFO:
+                return this.getColdDataFlowCtrInfo(ctx);
+            case RequestCode.SET_COMMITLOG_READ_MODE:
+                return this.setCommitLogReadaheadMode(ctx, request);
             case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
                 return this.searchOffsetByTimestamp(ctx, request);
             case RequestCode.GET_MAX_OFFSET:
@@ -759,6 +769,131 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    private synchronized RemotingCommand 
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand 
request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            try {
+                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+                Properties properties = MixAll.string2Properties(bodyStr);
+                if (properties != null) {
+                    LOGGER.info("updateColdDataFlowCtrGroupConfig new config: 
{}, client: {}", properties, ctx.channel().remoteAddress());
+                    properties.entrySet().stream().forEach(i -> {
+                        try {
+                            String consumerGroup = String.valueOf(i.getKey());
+                            Long threshold = 
Long.valueOf(String.valueOf(i.getValue()));
+                            
this.brokerController.getColdDataCgCtrService().addOrUpdateGroupConfig(consumerGroup,
 threshold);
+                        } catch (Exception e) {
+                            LOGGER.error("updateColdDataFlowCtrGroupConfig 
properties on entry error, key: {}, val: {}", i.getKey(), i.getValue(), e);
+                        }
+                    });
+                } else {
+                    LOGGER.error("updateColdDataFlowCtrGroupConfig 
string2Properties error");
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("string2Properties error");
+                    return response;
+                }
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("updateColdDataFlowCtrGroupConfig 
UnsupportedEncodingException", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private synchronized RemotingCommand 
removeColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        LOGGER.info("removeColdDataFlowCtrGroupConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        byte[] body = request.getBody();
+        if (body != null) {
+            try {
+                String consumerGroup = new String(body, 
MixAll.DEFAULT_CHARSET);
+                if (consumerGroup != null) {
+                    LOGGER.info("removeColdDataFlowCtrGroupConfig, 
consumerGroup: {} client: {}", consumerGroup, ctx.channel().remoteAddress());
+                    
this.brokerController.getColdDataCgCtrService().removeGroupConfig(consumerGroup);
+                } else {
+                    LOGGER.error("removeColdDataFlowCtrGroupConfig string 
parse error");
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("string parse error");
+                    return response;
+                }
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("removeColdDataFlowCtrGroupConfig 
UnsupportedEncodingException", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand getColdDataFlowCtrInfo(ChannelHandlerContext ctx) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        LOGGER.info("getColdDataFlowCtrInfo called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        String content = 
this.brokerController.getColdDataCgCtrService().getColdDataFlowCtrInfo();
+        if (content != null) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("getColdDataFlowCtrInfo 
UnsupportedEncodingException", e);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        }
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand setCommitLogReadaheadMode(ChannelHandlerContext 
ctx, RemotingCommand request) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+        LOGGER.info("setCommitLogReadaheadMode called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        try {
+            HashMap<String, String> extFields = request.getExtFields();
+            if (null == extFields) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("set commitlog readahead mode param error");
+                return response;
+            }
+            int mode = 
Integer.parseInt(extFields.get(FIleReadaheadMode.READ_AHEAD_MODE));
+            if (mode != LibC.MADV_RANDOM && mode != LibC.MADV_NORMAL) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("set commitlog readahead mode param value 
error");
+                return response;
+            }
+            MessageStore messageStore = 
this.brokerController.getMessageStore();
+            if (messageStore instanceof DefaultMessageStore) {
+                DefaultMessageStore defaultMessageStore = 
(DefaultMessageStore)messageStore;
+                if (mode == LibC.MADV_NORMAL) {
+                    
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
+                } else {
+                    
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(false);
+                }
+                
defaultMessageStore.getCommitLog().scanFileAndSetReadMode(mode);
+            }
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark("set commitlog readahead mode success, mode: " 
+ mode);
+        } catch (Exception e) {
+            LOGGER.error("set commitlog readahead mode failed", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("set commitlog readahead mode failed");
+        }
+        return response;
+    }
+
     private synchronized RemotingCommand 
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 8df2265c2c..b2794b1289 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -24,10 +24,12 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
@@ -65,6 +67,7 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
 import org.apache.rocketmq.remoting.rpc.RpcClientUtils;
 import org.apache.rocketmq.remoting.rpc.RpcRequest;
 import org.apache.rocketmq.remoting.rpc.RpcResponse;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageFilter;
@@ -283,7 +286,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
     @Override
     public RemotingCommand processRequest(final ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
-        return this.processRequest(ctx.channel(), request, true);
+        return this.processRequest(ctx.channel(), request, true, true);
     }
 
     @Override
@@ -295,7 +298,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
-    private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend)
+    private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request, boolean brokerAllowSuspend, boolean 
brokerAllowFlowCtrSuspend)
         throws RemotingCommandException {
         RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
         final PullMessageResponseHeader responseHeader = 
(PullMessageResponseHeader) response.readCustomHeader();
@@ -484,6 +487,32 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
         }
 
         final MessageStore messageStore = brokerController.getMessageStore();
+        if (this.brokerController.getMessageStore() instanceof 
DefaultMessageStore) {
+            DefaultMessageStore defaultMessageStore = 
(DefaultMessageStore)this.brokerController.getMessageStore();
+            boolean cgNeedColdDataFlowCtr = 
brokerController.getColdDataCgCtrService().isCgNeedColdDataFlowCtr(requestHeader.getConsumerGroup());
+            if (cgNeedColdDataFlowCtr) {
+                boolean isMsgLogicCold = defaultMessageStore.getCommitLog()
+                    
.getColdDataCheckService().isMsgInColdArea(requestHeader.getConsumerGroup(),
+                        requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getQueueOffset());
+                if (isMsgLogicCold) {
+                    ConsumeType consumeType = 
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()).getConsumeType();
+                    if (consumeType == ConsumeType.CONSUME_PASSIVELY) {
+                        response.setCode(ResponseCode.SYSTEM_BUSY);
+                        response.setRemark("This consumer group is reading 
cold data. It has been flow control");
+                        return response;
+                    } else if (consumeType == ConsumeType.CONSUME_ACTIVELY) {
+                        if (brokerAllowFlowCtrSuspend) {  // second arrived, 
which will not be held
+                            PullRequest pullRequest = new PullRequest(request, 
channel, 1000,
+                                this.brokerController.getMessageStore().now(), 
requestHeader.getQueueOffset(), subscriptionData, messageFilter);
+                            
this.brokerController.getColdDataPullRequestHoldService().suspendColdDataReadRequest(pullRequest);
+                            return null;
+                        }
+                        requestHeader.setMaxMsgNums(1);
+                    }
+                }
+            }
+        }
+
         final boolean useResetOffsetFeature = 
brokerController.getBrokerConfig().isUseServerSideResetOffset();
         String topic = requestHeader.getTopic();
         String group = requestHeader.getConsumerGroup();
@@ -515,7 +544,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                             finalResponse.setRemark("store getMessage return 
null");
                             return finalResponse;
                         }
-
+                        
brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(),
 result.getColdDataSum());
                         return pullMessageResultHandler.handle(
                             result,
                             request,
@@ -746,7 +775,8 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
     public void executeRequestWhenWakeup(final Channel channel, final 
RemotingCommand request) {
         Runnable run = () -> {
             try {
-                final RemotingCommand response = 
PullMessageProcessor.this.processRequest(channel, request, false);
+                boolean brokerAllowFlowCtrSuspend = !(request.getExtFields() 
!= null && 
request.getExtFields().containsKey(ColdDataPullRequestHoldService.NO_SUSPEND_KEY));
+                final RemotingCommand response = 
PullMessageProcessor.this.processRequest(channel, request, false, 
brokerAllowFlowCtrSuspend);
 
                 if (response != null) {
                     response.setOpaque(request.getOpaque());
diff --git a/broker/src/main/resources/rmq.broker.logback.xml 
b/broker/src/main/resources/rmq.broker.logback.xml
index 7902c0526a..78b1aea411 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -498,6 +498,34 @@
         <appender-ref ref="RocketmqPopSiftingAppender_inner"/>
     </appender>
 
+    <appender name="RocketmqColdCtrSiftingAppender" 
class="ch.qos.logback.classic.sift.SiftingAppender">
+        <discriminator>
+            <key>brokerContainerLogDir</key>
+            <defaultValue>${file.separator}</defaultValue>
+        </discriminator>
+        <sift>
+            <appender name="RocketmqColdCtrAppender"
+                      class="ch.qos.logback.core.rolling.RollingFileAppender">
+                <file>${user.home}/logs/rocketmqlogs/coldctr.log</file>
+                <append>true</append>
+                <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+                    
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/coldctr.%i.log
+                    </fileNamePattern>
+                    <minIndex>1</minIndex>
+                    <maxIndex>10</maxIndex>
+                </rollingPolicy>
+                <triggeringPolicy
+                        
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+                    <maxFileSize>100MB</maxFileSize>
+                </triggeringPolicy>
+                <encoder>
+                    <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - 
%m%n</pattern>
+                    <charset class="java.nio.charset.Charset">UTF-8</charset>
+                </encoder>
+            </appender>
+        </sift>
+    </appender>
+
     <appender name="RocketmqBrokerMetricsAppender" 
class="ch.qos.logback.classic.sift.SiftingAppender">
         <discriminator>
             <key>brokerContainerLogDir</key>
@@ -595,6 +623,11 @@
         <appender-ref ref="RocketmqPopSiftingAppender"/>
     </logger>
 
+    <logger name="RocketmqColdCtr" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqColdCtrSiftingAppender"/>
+    </logger>
+
     <logger name="RocketmqTraffic" additivity="false" level="INFO">
         <appender-ref ref="RocketmqTrafficSiftingAppender"/>
     </logger>
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 2c7a988ee4..995362bb77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -1748,6 +1749,82 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         throw new MQBrokerException(response.getCode(), response.getRemark(), 
addr);
     }
 
+    public void updateColdDataFlowCtrGroupConfig(final String addr, final 
Properties properties, final long timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException, 
UnsupportedEncodingException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG,
 null);
+        String str = MixAll.properties2String(properties);
+        if (str != null && str.length() > 0) {
+            request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
+            RemotingCommand response = this.remotingClient.invokeSync(
+                
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 
request, timeoutMillis);
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    return;
+                }
+                default:
+                    break;
+            }
+            throw new MQBrokerException(response.getCode(), 
response.getRemark());
+        }
+    }
+
+    public void removeColdDataFlowCtrGroupConfig(final String addr, final 
String consumerGroup, final long timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException, 
UnsupportedEncodingException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG,
 null);
+        if (consumerGroup != null && consumerGroup.length() > 0) {
+            request.setBody(consumerGroup.getBytes(MixAll.DEFAULT_CHARSET));
+            RemotingCommand response = this.remotingClient.invokeSync(
+                
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), 
request, timeoutMillis);
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS: {
+                    return;
+                }
+                default:
+                    break;
+            }
+            throw new MQBrokerException(response.getCode(), 
response.getRemark());
+        }
+    }
+
+    public String getColdDataFlowCtrInfo(final String addr, final long 
timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException, 
UnsupportedEncodingException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_COLD_DATA_FLOW_CTR_INFO, 
null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (null != response.getBody() && response.getBody().length > 
0) {
+                    return new String(response.getBody(), 
MixAll.DEFAULT_CHARSET);
+                }
+                return null;
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public String setCommitLogReadAheadMode(final String addr, final String 
mode, final long timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SET_COMMITLOG_READ_MODE, null);
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(FIleReadaheadMode.READ_AHEAD_MODE, mode);
+        request.setExtFields(extFields);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (null != response.getRemark() && 
response.getRemark().length() > 0) {
+                    return response.getRemark();
+                }
+                return null;
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
     public ClusterInfo getBrokerClusterInfo(
         final long timeoutMillis) throws InterruptedException, 
RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e9fad05e51..47ce2cb8d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -370,6 +370,10 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean estimateAccumulation = true;
 
+    private boolean coldCtrStrategyEnable = false;
+    private boolean usePIDColdCtrStrategy = true;
+    private long cgColdReadThreshold = 3 * 1024 * 1024;
+    private long globalColdReadThreshold = 100 * 1024 * 1024;
 
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
@@ -1619,6 +1623,38 @@ public class BrokerConfig extends BrokerIdentity {
         this.estimateAccumulation = estimateAccumulation;
     }
 
+    public boolean isColdCtrStrategyEnable() {
+        return coldCtrStrategyEnable;
+    }
+
+    public void setColdCtrStrategyEnable(boolean coldCtrStrategyEnable) {
+        this.coldCtrStrategyEnable = coldCtrStrategyEnable;
+    }
+
+    public boolean isUsePIDColdCtrStrategy() {
+        return usePIDColdCtrStrategy;
+    }
+
+    public void setUsePIDColdCtrStrategy(boolean usePIDColdCtrStrategy) {
+        this.usePIDColdCtrStrategy = usePIDColdCtrStrategy;
+    }
+
+    public long getCgColdReadThreshold() {
+        return cgColdReadThreshold;
+    }
+
+    public void setCgColdReadThreshold(long cgColdReadThreshold) {
+        this.cgColdReadThreshold = cgColdReadThreshold;
+    }
+
+    public long getGlobalColdReadThreshold() {
+        return globalColdReadThreshold;
+    }
+
+    public void setGlobalColdReadThreshold(long globalColdReadThreshold) {
+        this.globalColdReadThreshold = globalColdReadThreshold;
+    }
+
     public boolean isUseStaticSubscription() {
         return useStaticSubscription;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java 
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 3d6f5d68d0..dc1d69fe10 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -498,4 +498,21 @@ public class MixAll {
         return path.normalize().toString();
     }
 
+    public static boolean isSysConsumerGroupForNoColdReadLimit(String 
consumerGroup) {
+        if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup)
+            || TOOLS_CONSUMER_GROUP.equals(consumerGroup)
+            || SCHEDULE_CONSUMER_GROUP.equals(consumerGroup)
+            || FILTERSRV_CONSUMER_GROUP.equals(consumerGroup)
+            || MONITOR_CONSUMER_GROUP.equals(consumerGroup)
+            || SELF_TEST_CONSUMER_GROUP.equals(consumerGroup)
+            || ONS_HTTP_PROXY_GROUP.equals(consumerGroup)
+            || CID_ONSAPI_PERMISSION_GROUP.equals(consumerGroup)
+            || CID_ONSAPI_OWNER_GROUP.equals(consumerGroup)
+            || CID_ONSAPI_PULL_GROUP.equals(consumerGroup)
+            || CID_SYS_RMQ_TRANS.equals(consumerGroup)
+            || consumerGroup.startsWith(CID_RMQ_SYS_PREFIX)) {
+            return true;
+        }
+        return false;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java 
b/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java
new file mode 100644
index 0000000000..212bc08c48
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.coldctr;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AccAndTimeStamp {
+
+    public AtomicLong coldAcc = new AtomicLong(0L);
+    public Long lastColdReadTimeMills = System.currentTimeMillis();
+    public Long createTimeMills = System.currentTimeMillis();
+
+    public AccAndTimeStamp(AtomicLong coldAcc) {
+        this.coldAcc = coldAcc;
+    }
+
+    public AtomicLong getColdAcc() {
+        return coldAcc;
+    }
+
+    public void setColdAcc(AtomicLong coldAcc) {
+        this.coldAcc = coldAcc;
+    }
+
+    public Long getLastColdReadTimeMills() {
+        return lastColdReadTimeMills;
+    }
+
+    public void setLastColdReadTimeMills(Long lastColdReadTimeMills) {
+        this.lastColdReadTimeMills = lastColdReadTimeMills;
+    }
+
+    public Long getCreateTimeMills() {
+        return createTimeMills;
+    }
+
+    public void setCreateTimeMills(Long createTimeMills) {
+        this.createTimeMills = createTimeMills;
+    }
+
+    @Override
+    public String toString() {
+        return "AccAndTimeStamp{" +
+            "coldAcc=" + coldAcc +
+            ", lastColdReadTimeMills=" + lastColdReadTimeMills +
+            ", createTimeMills=" + createTimeMills +
+            '}';
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
 
b/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
new file mode 100644
index 0000000000..e23a5f5878
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+public class FIleReadaheadMode {
+    public static final String READ_AHEAD_MODE = "READ_AHEAD_MODE";
+}
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java 
b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index a871fc5272..c1176ea153 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -50,4 +50,5 @@ public class LoggerName {
     public static final String STDOUT_LOGGER_NAME = "STDOUT";
     public static final String PROXY_LOGGER_NAME = "RocketmqProxy";
     public static final String PROXY_WATER_MARK_LOGGER_NAME = 
"RocketmqProxyWatermark";
+    public static final String ROCKETMQ_COLDCTR_LOGGER_NAME = 
"RocketmqColdCtr";
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 9f9a64ed0e..ec87039b41 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -275,6 +275,11 @@ public class RequestCode {
      */
     public static final int CLEAN_BROKER_DATA = 1011;
 
+    public static final int UPDATE_COLD_DATA_FLOW_CTR_CONFIG = 2001;
+    public static final int REMOVE_COLD_DATA_FLOW_CTR_CONFIG = 2002;
+    public static final int GET_COLD_DATA_FLOW_CTR_INFO = 2003;
+    public static final int SET_COMMITLOG_READ_MODE = 2004;
+
     public static final int CONTROLLER_GET_NEXT_BROKER_ID = 1012;
 
     public static final int CONTROLLER_APPLY_BROKER_ID = 1013;
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 56f19529d1..5a5c90c5a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -27,12 +27,17 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
@@ -54,6 +59,8 @@ import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.util.LibC;
+import sun.nio.ch.DirectBuffer;
 
 /**
  * Store all metadata downtime for recovery, data protection reliability
@@ -68,6 +75,7 @@ public class CommitLog implements Swappable {
     protected final DefaultMessageStore defaultMessageStore;
 
     private final FlushManager flushManager;
+    private final ColdDataCheckService coldDataCheckService;
 
     private final AppendMessageCallback appendMessageCallback;
     private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
@@ -101,6 +109,7 @@ public class CommitLog implements Swappable {
         this.defaultMessageStore = messageStore;
 
         this.flushManager = new DefaultFlushManager();
+        this.coldDataCheckService = new ColdDataCheckService();
 
         this.appendMessageCallback = new DefaultAppendMessageCallback();
         putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@@ -136,6 +145,9 @@ public class CommitLog implements Swappable {
 
     public boolean load() {
         boolean result = this.mappedFileQueue.load();
+        if (result && 
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable()) {
+            scanFileAndSetReadMode(LibC.MADV_RANDOM);
+        }
         this.mappedFileQueue.checkSelf();
         log.info("load commit log " + (result ? "OK" : "Failed"));
         return result;
@@ -146,12 +158,18 @@ public class CommitLog implements Swappable {
         log.info("start commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.setDaemon(true);
         flushDiskWatcher.start();
+        if (this.coldDataCheckService != null) {
+            this.coldDataCheckService.start();
+        }
     }
 
     public void shutdown() {
         this.flushManager.shutdown();
         log.info("shutdown commitLog successfully. storeRoot: {}", 
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
         flushDiskWatcher.shutdown(true);
+        if (this.coldDataCheckService != null) {
+            this.coldDataCheckService.shutdown();
+        }
     }
 
     public long flush() {
@@ -901,6 +919,9 @@ public class CommitLog implements Swappable {
 
                 if (null == mappedFile || mappedFile.isFull()) {
                     mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
+                    if (isCloseReadAhead()) {
+                        setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+                    }
                 }
                 if (null == mappedFile) {
                     log.error("create mapped file1 error, topic: " + 
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
@@ -924,6 +945,9 @@ public class CommitLog implements Swappable {
                             beginTimeInLock = 0;
                             return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                         }
+                        if (isCloseReadAhead()) {
+                            setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+                        }
                         result = mappedFile.appendMessage(msg, 
this.appendMessageCallback, putMessageContext);
                         if 
(AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
                             onCommitLogAppend(msg, result, mappedFile);
@@ -1060,6 +1084,9 @@ public class CommitLog implements Swappable {
 
                 if (null == mappedFile || mappedFile.isFull()) {
                     mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 
Mark: NewFile may be cause noise
+                    if (isCloseReadAhead()) {
+                        setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+                    }
                 }
                 if (null == mappedFile) {
                     log.error("Create mapped file1 error, topic: {} 
clientAddr: {}", messageExtBatch.getTopic(), 
messageExtBatch.getBornHostString());
@@ -1081,6 +1108,9 @@ public class CommitLog implements Swappable {
                             beginTimeInLock = 0;
                             return CompletableFuture.completedFuture(new 
PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                         }
+                        if (isCloseReadAhead()) {
+                            setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+                        }
                         result = mappedFile.appendMessages(messageExtBatch, 
this.appendMessageCallback, putMessageContext);
                         break;
                     case MESSAGE_SIZE_EXCEEDED:
@@ -1236,7 +1266,11 @@ public class CommitLog implements Swappable {
         MappedFile mappedFile = 
this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
         if (mappedFile != null) {
             int pos = (int) (offset % mappedFileSize);
-            return mappedFile.selectMappedBuffer(pos, size);
+            SelectMappedBufferResult selectMappedBufferResult = 
mappedFile.selectMappedBuffer(pos, size);
+            if (null != selectMappedBufferResult) {
+                
selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
+                return selectMappedBufferResult;
+            }
         }
         return null;
     }
@@ -2018,4 +2052,207 @@ public class CommitLog implements Swappable {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    private boolean isCloseReadAhead() {
+        return !MixAll.isWindows() && 
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
+    }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new 
ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = 
defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (MixAll.isWindows() || 
!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || 
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - 
beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime 
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if 
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, 
getMaxOffset())) {
+                return true;
+            }
+            if 
(!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = 
mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % 
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (MixAll.isWindows() || 
!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || 
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable() || pageSize 
<= 0) {
+                return;
+            }
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().forEach(mappedFile -> {
+                    byte[] pageCacheTable = checkFileInPageCache(mappedFile);
+                    if (sampleSteps > 1) {
+                        pageCacheTable = sampling(pageCacheTable, sampleSteps);
+                    }
+                    pageCacheMap.put(mappedFile.getFileName(), pageCacheTable);
+                });
+            } catch (Exception e) {
+                log.error("scanFilesInPageCache exception", e);
+            }
+        }
+
+        private void clearExpireMappedFile() {
+            Set<String> currentFileSet = 
mappedFileQueue.getMappedFiles().stream().map(MappedFile::getFileName).collect(Collectors.toSet());
+            pageCacheMap.forEach((key, value) -> {
+                if (!currentFileSet.contains(key)) {
+                    pageCacheMap.remove(key);
+                    log.info("clearExpireMappedFile fileName: {}, has been 
clear", key);
+                }
+            });
+        }
+
+        private byte[] sampling(byte[] pageCacheTable, int sampleStep) {
+            byte[] sample = new byte[(pageCacheTable.length + sampleStep - 1) 
/ sampleStep];
+            for (int i = 0, j = 0; i < pageCacheTable.length && j < 
sample.length; i += sampleStep) {
+                sample[j++] = pageCacheTable[i];
+            }
+            return sample;
+        }
+
+        private byte[] checkFileInPageCache(MappedFile mappedFile) {
+            long fileSize = mappedFile.getFileSize();
+            final long address = 
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+            int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize;
+            byte[] pageCacheRst = new byte[pageNums];
+            int mincore = LibC.INSTANCE.mincore(new Pointer(address), new 
NativeLong(fileSize), pageCacheRst);
+            if (mincore != 0) {
+                log.error("checkFileInPageCache call the LibC.INSTANCE.mincore 
error, fileName: {}, fileSize: {}",
+                    mappedFile.getFileName(), fileSize);
+                for (int i = 0; i < pageNums; i++) {
+                    pageCacheRst[i] = 1;
+                }
+            }
+            return pageCacheRst;
+        }
+
+        private void initPageSize() {
+            if (pageSize < 0) {
+                try {
+                    if (!MixAll.isWindows()) {
+                        pageSize = LibC.INSTANCE.getpagesize();
+                    } else {
+                        
defaultMessageStore.getMessageStoreConfig().setColdDataFlowControlEnable(false);
+                        log.info("windows os, coldDataCheckEnable force 
setting to be false");
+                    }
+                    log.info("initPageSize pageSize: {}", pageSize);
+                } catch (Exception e) {
+                    
defaultMessageStore.getMessageStoreConfig().setColdDataFlowControlEnable(false);
+                    log.error("initPageSize error, coldDataCheckEnable force 
setting to be false ", e);
+                }
+            }
+        }
+
+        /**
+         * this result is not high accurate.
+         */
+        public boolean isMsgInColdArea(String group, String topic, int 
queueId, long offset) {
+            if 
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return false;
+            }
+            try {
+                ConsumeQueue consumeQueue = 
(ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId);
+                if (null == consumeQueue) {
+                    return false;
+                }
+                SelectMappedBufferResult bufferConsumeQueue = 
consumeQueue.getIndexBuffer(offset);
+                if (null == bufferConsumeQueue || null == 
bufferConsumeQueue.getByteBuffer()) {
+                    return false;
+                }
+                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                return 
defaultMessageStore.checkInColdAreaByCommitOffset(offsetPy, getMaxOffset());
+            } catch (Exception e) {
+                log.error("isMsgInColdArea group: {}, topic: {}, queueId: {}, 
offset: {}",
+                    group, topic, queueId, offset, e);
+            }
+            return false;
+        }
+    }
+
+    public void scanFileAndSetReadMode(int mode) {
+        if (MixAll.isWindows()) {
+            log.info("windows os stop scanFileAndSetReadMode");
+            return;
+        }
+        try {
+            log.info("scanFileAndSetReadMode mode: {}", mode);
+            mappedFileQueue.getMappedFiles().forEach(mappedFile -> {
+                setFileReadMode(mappedFile, mode);
+            });
+        } catch (Exception e) {
+            log.error("scanFileAndSetReadMode exception", e);
+        }
+    }
+
+    private int setFileReadMode(MappedFile mappedFile, int mode) {
+        if (null == mappedFile) {
+            log.error("setFileReadMode mappedFile is null");
+            return -1;
+        }
+        final long address = 
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+        int madvise = LibC.INSTANCE.madvise(new Pointer(address), new 
NativeLong(mappedFile.getFileSize()), mode);
+        if (madvise != 0) {
+            log.error("setFileReadMode error fileName: {}, madvise: {}, 
mode:{}", mappedFile.getFileName(), madvise, mode);
+        }
+        return madvise;
+    }
+
+    public ColdDataCheckService getColdDataCheckService() {
+        return coldDataCheckService;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6b0516b04f..acc1610e08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -886,6 +886,10 @@ public class DefaultMessageStore implements MessageStore {
                                 continue;
                             }
 
+                            if 
(messageStoreConfig.isColdDataFlowControlEnable() && 
!MixAll.isSysConsumerGroupForNoColdReadLimit(group) && 
!selectResult.isInCache()) {
+                                
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
+                            }
+
                             if (messageFilter != null
                                 && 
!messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), 
null)) {
                                 if (getResult.getBufferTotalSize() == 0) {
@@ -1705,6 +1709,16 @@ public class DefaultMessageStore implements MessageStore 
{
         return offsetPy >= commitLog.getMinOffset();
     }
 
+    /**
+     * The ratio val is estimated by the experiment and experience
+     * so that the result is not high accurate for different business
+     * @return
+     */
+    public boolean checkInColdAreaByCommitOffset(long offsetPy, long 
maxOffsetPy) {
+        long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * 
(this.messageStoreConfig.getAccessMessageInMemoryHotRatio() / 100.0));
+        return (maxOffsetPy - offsetPy) > memory;
+    }
+
     private boolean isTheBatchFull(int sizePy, int unitBatchNum, int 
maxMsgNums, long maxMsgSize, int bufferTotal,
         int messageTotal, boolean isInMem) {
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java 
b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 724ffdd878..a7556dfb85 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -41,6 +41,8 @@ public class GetMessageResult {
     private int msgCount4Commercial = 0;
     private int commercialSizePerMsg = 4 * 1024;
 
+    private long coldDataSum = 0L;
+
     public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
         new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, 
Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList());
@@ -167,6 +169,14 @@ public class GetMessageResult {
         return messageQueueOffset;
     }
 
+    public long getColdDataSum() {
+        return coldDataSum;
+    }
+
+    public void setColdDataSum(long coldDataSum) {
+        this.coldDataSum = coldDataSum;
+    }
+
     @Override
     public String toString() {
         return "GetMessageResult [status=" + status + ", nextBeginOffset=" + 
nextBeginOffset + ", minOffset="
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 317f536058..5c38cfe92a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -29,6 +29,8 @@ public class SelectMappedBufferResult {
 
     protected MappedFile mappedFile;
 
+    private boolean isInCache = true;
+
     public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, 
int size, MappedFile mappedFile) {
         this.startOffset = startOffset;
         this.byteBuffer = byteBuffer;
@@ -74,4 +76,12 @@ public class SelectMappedBufferResult {
         long pos = startOffset - mappedFile.getFileFromOffset();
         return mappedFile.isLoaded(pos, size);
     }
+
+    public boolean isInCache() {
+        return isInCache;
+    }
+
+    public void setInCache(boolean inCache) {
+        isInCache = inCache;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index f5ad70543c..099be93051 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -379,6 +379,12 @@ public class MessageStoreConfig {
      */
     private int sampleCountThreshold = 5000;
 
+    private boolean coldDataFlowControlEnable = false;
+    private boolean coldDataScanEnable = false;
+    private boolean dataReadAheadEnable = false;
+    private int timerColdDataCheckIntervalMs = 60 * 1000;
+    private int sampleSteps = 32;
+    private int accessMessageInMemoryHotRatio = 26;
     /**
      * Build ConsumeQueue concurrently with multi-thread
      */
@@ -1641,6 +1647,54 @@ public class MessageStoreConfig {
         this.sampleCountThreshold = sampleCountThreshold;
     }
 
+    public boolean isColdDataFlowControlEnable() {
+        return coldDataFlowControlEnable;
+    }
+
+    public void setColdDataFlowControlEnable(boolean 
coldDataFlowControlEnable) {
+        this.coldDataFlowControlEnable = coldDataFlowControlEnable;
+    }
+
+    public boolean isColdDataScanEnable() {
+        return coldDataScanEnable;
+    }
+
+    public void setColdDataScanEnable(boolean coldDataScanEnable) {
+        this.coldDataScanEnable = coldDataScanEnable;
+    }
+
+    public int getTimerColdDataCheckIntervalMs() {
+        return timerColdDataCheckIntervalMs;
+    }
+
+    public void setTimerColdDataCheckIntervalMs(int 
timerColdDataCheckIntervalMs) {
+        this.timerColdDataCheckIntervalMs = timerColdDataCheckIntervalMs;
+    }
+
+    public int getSampleSteps() {
+        return sampleSteps;
+    }
+
+    public void setSampleSteps(int sampleSteps) {
+        this.sampleSteps = sampleSteps;
+    }
+
+    public int getAccessMessageInMemoryHotRatio() {
+        return accessMessageInMemoryHotRatio;
+    }
+
+    public void setAccessMessageInMemoryHotRatio(int 
accessMessageInMemoryHotRatio) {
+        this.accessMessageInMemoryHotRatio = accessMessageInMemoryHotRatio;
+    }
+
+    public boolean isDataReadAheadEnable() {
+        return dataReadAheadEnable;
+    }
+
+    public void setDataReadAheadEnable(boolean dataReadAheadEnable) {
+        this.dataReadAheadEnable = dataReadAheadEnable;
+    }
+
     public boolean isEnableBuildConsumeQueueConcurrently() {
         return enableBuildConsumeQueueConcurrently;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java 
b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
index 8eaa44ab46..4c8e7d4538 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
@@ -25,6 +25,8 @@ import com.sun.jna.Pointer;
 public interface LibC extends Library {
     LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" 
: "c", LibC.class);
 
+    int MADV_NORMAL = 0;
+    int MADV_RANDOM = 1;
     int MADV_WILLNEED = 3;
     int MADV_DONTNEED = 4;
 
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);
+
+    int getpagesize();
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 2ca7d667bb..dd9c6a9b45 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -844,4 +844,32 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws 
RemotingException, InterruptedException, MQBrokerException {
         this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr, 
clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker);
     }
+
+    @Override
+    public void updateColdDataFlowCtrGroupConfig(String brokerAddr, Properties 
properties)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        defaultMQAdminExtImpl.updateColdDataFlowCtrGroupConfig(brokerAddr, 
properties);
+    }
+
+    @Override
+    public void removeColdDataFlowCtrGroupConfig(String brokerAddr, String 
consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        defaultMQAdminExtImpl.removeColdDataFlowCtrGroupConfig(brokerAddr, 
consumerGroup);
+    }
+
+    @Override
+    public String getColdDataFlowCtrInfo(String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.getColdDataFlowCtrInfo(brokerAddr);
+    }
+
+    @Override
+    public String setCommitLogReadAheadMode(String brokerAddr, String mode)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.setCommitLogReadAheadMode(brokerAddr, 
mode);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1d4b1bbfc7..c5c467bf00 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1905,4 +1905,31 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
     public MQClientInstance getMqClientInstance() {
         return mqClientInstance;
     }
+
+    @Override
+    public void updateColdDataFlowCtrGroupConfig(String brokerAddr, Properties 
properties)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        
this.mqClientInstance.getMQClientAPIImpl().updateColdDataFlowCtrGroupConfig(brokerAddr,
 properties, timeoutMillis);
+    }
+
+    @Override
+    public void removeColdDataFlowCtrGroupConfig(String brokerAddr, String 
consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        
this.mqClientInstance.getMQClientAPIImpl().removeColdDataFlowCtrGroupConfig(brokerAddr,
 consumerGroup, timeoutMillis);
+    }
+
+    @Override
+    public String getColdDataFlowCtrInfo(String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().getColdDataFlowCtrInfo(brokerAddr, 
timeoutMillis);
+    }
+
+    @Override
+    public String setCommitLogReadAheadMode(String brokerAddr, String mode)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQBrokerException {
+        return 
this.mqClientInstance.getMQClientAPIImpl().setCommitLogReadAheadMode(brokerAddr,
 mode, timeoutMillis);
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 01f985496c..7dcfc4fa5e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -473,4 +473,16 @@ public interface MQAdminExt extends MQAdmin {
         String brokerControllerIdsToClean,
         boolean isCleanLivingBroker) throws RemotingException, 
InterruptedException, MQBrokerException;
 
+    void updateColdDataFlowCtrGroupConfig(final String brokerAddr, final 
Properties properties)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, 
MQBrokerException;
+
+    void removeColdDataFlowCtrGroupConfig(final String brokerAddr, final 
String consumerGroup)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, 
MQBrokerException;
+
+    String getColdDataFlowCtrInfo(final String brokerAddr)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, 
MQBrokerException;
+
+    String setCommitLogReadAheadMode(final String brokerAddr, String mode)
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, 
MQBrokerException;
+
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fd64d69c73..0c2618e91c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -36,12 +36,16 @@ import 
org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
 import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import 
org.apache.rocketmq.tools.command.broker.CommitLogSetReadAheadSubCommand;
 import 
org.apache.rocketmq.tools.command.broker.DeleteExpiredCommitLogSubCommand;
 import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
 import org.apache.rocketmq.tools.command.broker.GetBrokerEpochSubCommand;
+import 
org.apache.rocketmq.tools.command.broker.GetColdDataFlowCtrInfoSubCommand;
+import 
org.apache.rocketmq.tools.command.broker.RemoveColdDataFlowCtrGroupConfigSubCommand;
 import 
org.apache.rocketmq.tools.command.broker.ResetMasterFlushOffsetSubCommand;
 import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
 import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
+import 
org.apache.rocketmq.tools.command.broker.UpdateColdDataFlowCtrGroupConfigSubCommand;
 import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
 import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand;
 import 
org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
@@ -263,6 +267,11 @@ public class MQAdminStartup {
         initCommand(new ReElectMasterSubCommand());
         initCommand(new CleanControllerBrokerMetaSubCommand());
         initCommand(new DumpCompactionLogCommand());
+
+        initCommand(new GetColdDataFlowCtrInfoSubCommand());
+        initCommand(new UpdateColdDataFlowCtrGroupConfigSubCommand());
+        initCommand(new RemoveColdDataFlowCtrGroupConfigSubCommand());
+        initCommand(new CommitLogSetReadAheadSubCommand());
     }
 
     private static void printHelp() {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
new file mode 100644
index 0000000000..b00c7f5f58
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class CommitLogSetReadAheadSubCommand implements SubCommand {
+    private static final String MADV_RANDOM = "1";
+    private static final String MADV_NORMAL = "0";
+    @Override
+    public String commandName() {
+        return "setCommitLogReadAheadMode";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "set read ahead mode for all commitlog files";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "set which broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "set which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "commitLogReadAheadMode", true, "set the 
CommitLog read ahead mode; 0 is default, 1 random read");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook)
+        throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            String mode = commandLine.getOptionValue('m').trim();
+            if (!mode.equals(MADV_RANDOM) && !mode.equals(MADV_NORMAL)) {
+                System.out.printf("set the read mode error; 0 is default, 1 
random read\n");
+                return;
+            }
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                setAndPrint(defaultMQAdminExt, 
String.format("============%s============\n", brokerAddr), brokerAddr, mode);
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                defaultMQAdminExt.start();
+                Map<String, List<String>> masterAndSlaveMap = 
CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+                for (String masterAddr : masterAndSlaveMap.keySet()) {
+                    setAndPrint(defaultMQAdminExt, 
String.format("============Master: %s============\n", masterAddr), masterAddr, 
mode);
+                    for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) 
{
+                        setAndPrint(defaultMQAdminExt, 
String.format("============My Master: %s=====Slave: %s============\n", 
masterAddr, slaveAddr), slaveAddr, mode);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    protected void setAndPrint(final MQAdminExt defaultMQAdminExt, final 
String printPrefix, final String addr, final String mode)
+        throws InterruptedException, RemotingConnectException, 
UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, 
RemotingSendRequestException {
+        System.out.print(" " + printPrefix);
+        System.out.printf("commitLog set readAhead mode rstStr" + 
defaultMQAdminExt.setCommitLogReadAheadMode(addr, mode) + "\n");
+    }
+}
\ No newline at end of file
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
new file mode 100644
index 0000000000..7c54e650c3
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetColdDataFlowCtrInfoSubCommand implements SubCommand {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    @Override
+    public String commandName() {
+        return "getColdDataFlowCtrInfo";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "get cold data flow ctr info";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "get from which 
broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "get from which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, 
final RPCHook rpcHook)
+        throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                getAndPrint(defaultMQAdminExt, 
String.format("============%s============\n", brokerAddr), brokerAddr);
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                defaultMQAdminExt.start();
+                Map<String, List<String>> masterAndSlaveMap = 
CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+                for (String masterAddr : masterAndSlaveMap.keySet()) {
+                    getAndPrint(defaultMQAdminExt, 
String.format("============Master: %s============\n", masterAddr), masterAddr);
+                    for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) 
{
+                        getAndPrint(defaultMQAdminExt, 
String.format("============My Master: %s=====Slave: %s============\n", 
masterAddr, slaveAddr), slaveAddr);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final 
String printPrefix, final String addr)
+        throws InterruptedException, RemotingConnectException,
+        UnsupportedEncodingException, RemotingTimeoutException,
+        MQBrokerException, RemotingSendRequestException {
+
+        System.out.print(" " + printPrefix);
+        String rstStr = defaultMQAdminExt.getColdDataFlowCtrInfo(addr);
+        if (rstStr == null) {
+            System.out.printf("Broker[%s] has no cold ctr table !\n", addr);
+            return;
+        }
+        JSONObject jsonObject = JSON.parseObject(rstStr);
+        Map<String, JSONObject> runtimeTable = (Map<String, 
JSONObject>)jsonObject.get("runtimeTable");
+        runtimeTable.entrySet().stream().forEach(i -> {
+            JSONObject value = i.getValue();
+            Date lastColdReadTimeMillsDate = new 
Date(Long.parseLong(String.valueOf(value.get("lastColdReadTimeMills"))));
+            value.put("lastColdReadTimeFormat", 
sdf.format(lastColdReadTimeMillsDate));
+            value.remove("lastColdReadTimeMills");
+
+            Date createTimeMillsDate = new 
Date(Long.parseLong(String.valueOf(value.get("createTimeMills"))));
+            value.put("createTimeFormat", sdf.format(createTimeMillsDate));
+            value.remove("createTimeMills");
+        });
+
+        String formatStr = JSON.toJSONString(jsonObject, true);
+        System.out.printf(formatStr);
+        System.out.printf("%n");
+    }
+
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
new file mode 100644
index 0000000000..b0477924f2
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class RemoveColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "removeColdDataFlowCtrGroupConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "remove consumer from cold ctr config";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "update which 
broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumerGroup", true, "the consumer group will 
remove from the config");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            String consumerGroup = commandLine.getOptionValue('g').trim();
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.removeColdDataFlowCtrGroupConfig(brokerAddr, 
consumerGroup);
+                System.out.printf("remove broker cold read threshold success, 
%s\n", brokerAddr);
+                return;
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                defaultMQAdminExt.start();
+                Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String brokerAddr : masterSet) {
+                    try {
+                        
defaultMQAdminExt.removeColdDataFlowCtrGroupConfig(brokerAddr, consumerGroup);
+                        System.out.printf("remove broker cold read threshold 
success, %s\n", brokerAddr);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                return;
+            }
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), 
options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
new file mode 100644
index 0000000000..d06a24b579
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateColdDataFlowCtrGroupConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "addOrUpdate cold data flow ctr group config";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("b", "brokerAddr", true, "update which 
broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update which cluster");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumerGroup", true, "specific consumerGroup");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("v", "threshold", true, "cold read threshold value");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+        try {
+            String key = commandLine.getOptionValue('g').trim();
+            String value = commandLine.getOptionValue('v').trim();
+            Properties properties = new Properties();
+            properties.put(key, value);
+
+            if (commandLine.hasOption('b')) {
+                String brokerAddr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.updateColdDataFlowCtrGroupConfig(brokerAddr, 
properties);
+                System.out.printf("updateColdDataFlowCtrGroupConfig success, 
%s\n", brokerAddr);
+                return;
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+                defaultMQAdminExt.start();
+                Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String brokerAddr : masterSet) {
+                    try {
+                        
defaultMQAdminExt.updateColdDataFlowCtrGroupConfig(brokerAddr, properties);
+                        System.out.printf("updateColdDataFlowCtrGroupConfig 
success, %s\n", brokerAddr);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+                return;
+            }
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), 
options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
\ No newline at end of file

Reply via email to