This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bee5077bcb [ISSUE #6336] [RIP-62] Cold Read Control (#6507)
bee5077bcb is described below
commit bee5077bcb77411f103aafb2220184f59db2c95e
Author: Drizzle <[email protected]>
AuthorDate: Mon Jun 5 17:09:31 2023 +0800
[ISSUE #6336] [RIP-62] Cold Read Control (#6507)
---
.../apache/rocketmq/broker/BrokerController.java | 38 ++++
.../rocketmq/broker/coldctr/ColdCtrStrategy.java | 42 ++++
.../broker/coldctr/ColdDataCgCtrService.java | 250 +++++++++++++++++++++
.../coldctr/ColdDataPullRequestHoldService.java | 105 +++++++++
.../broker/coldctr/PIDAdaptiveColdCtrStrategy.java | 85 +++++++
.../broker/coldctr/SimpleColdCtrStrategy.java | 51 +++++
.../broker/processor/AdminBrokerProcessor.java | 135 +++++++++++
.../broker/processor/PullMessageProcessor.java | 38 +++-
broker/src/main/resources/rmq.broker.logback.xml | 33 +++
.../rocketmq/client/impl/MQClientAPIImpl.java | 77 +++++++
.../org/apache/rocketmq/common/BrokerConfig.java | 36 +++
.../java/org/apache/rocketmq/common/MixAll.java | 17 ++
.../rocketmq/common/coldctr/AccAndTimeStamp.java | 63 ++++++
.../common/constant/FIleReadaheadMode.java | 21 ++
.../rocketmq/common/constant/LoggerName.java | 1 +
.../rocketmq/remoting/protocol/RequestCode.java | 5 +
.../java/org/apache/rocketmq/store/CommitLog.java | 239 +++++++++++++++++++-
.../apache/rocketmq/store/DefaultMessageStore.java | 14 ++
.../apache/rocketmq/store/GetMessageResult.java | 10 +
.../rocketmq/store/SelectMappedBufferResult.java | 10 +
.../rocketmq/store/config/MessageStoreConfig.java | 54 +++++
.../java/org/apache/rocketmq/store/util/LibC.java | 6 +
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 28 +++
.../tools/admin/DefaultMQAdminExtImpl.java | 27 +++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 12 +
.../rocketmq/tools/command/MQAdminStartup.java | 9 +
.../broker/CommitLogSetReadAheadSubCommand.java | 106 +++++++++
.../broker/GetColdDataFlowCtrInfoSubCommand.java | 124 ++++++++++
...RemoveColdDataFlowCtrGroupConfigSubCommand.java | 93 ++++++++
...UpdateColdDataFlowCtrGroupConfigSubCommand.java | 103 +++++++++
30 files changed, 1827 insertions(+), 5 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8560bde9fa..73da996ae8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -51,6 +51,8 @@ import
org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.coldctr.ColdDataCgCtrService;
+import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -265,6 +267,8 @@ public class BrokerController {
protected ReplicasManager replicasManager;
private long lastSyncTimeMs = System.currentTimeMillis();
private BrokerMetricsManager brokerMetricsManager;
+ private ColdDataPullRequestHoldService coldDataPullRequestHoldService;
+ private ColdDataCgCtrService coldDataCgCtrService;
public BrokerController(
final BrokerConfig brokerConfig,
@@ -321,6 +325,8 @@ public class BrokerController {
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new
LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.scheduleMessageService = new ScheduleMessageService(this);
+ this.coldDataPullRequestHoldService = new
ColdDataPullRequestHoldService(this);
+ this.coldDataCgCtrService = new ColdDataCgCtrService(this);
if (nettyClientConfig != null) {
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
@@ -1403,6 +1409,14 @@ public class BrokerController {
this.brokerPreOnlineService.shutdown();
}
+ if (this.coldDataPullRequestHoldService != null) {
+ this.coldDataPullRequestHoldService.shutdown();
+ }
+
+ if (this.coldDataCgCtrService != null) {
+ this.coldDataCgCtrService.shutdown();
+ }
+
shutdownScheduledExecutorService(this.syncBrokerMemberGroupExecutorService);
shutdownScheduledExecutorService(this.brokerHeartbeatExecutorService);
@@ -1547,6 +1561,13 @@ public class BrokerController {
this.brokerPreOnlineService.start();
}
+ if (this.coldDataPullRequestHoldService != null) {
+ this.coldDataPullRequestHoldService.start();
+ }
+
+ if (this.coldDataCgCtrService != null) {
+ this.coldDataCgCtrService.start();
+ }
}
public void start() throws Exception {
@@ -2297,4 +2318,21 @@ public class BrokerController {
public BlockingQueue<Runnable> getAdminBrokerThreadPoolQueue() {
return adminBrokerThreadPoolQueue;
}
+
+ public ColdDataPullRequestHoldService getColdDataPullRequestHoldService() {
+ return coldDataPullRequestHoldService;
+ }
+
+ public void setColdDataPullRequestHoldService(
+ ColdDataPullRequestHoldService coldDataPullRequestHoldService) {
+ this.coldDataPullRequestHoldService = coldDataPullRequestHoldService;
+ }
+
+ public ColdDataCgCtrService getColdDataCgCtrService() {
+ return coldDataCgCtrService;
+ }
+
+ public void setColdDataCgCtrService(ColdDataCgCtrService
coldDataCgCtrService) {
+ this.coldDataCgCtrService = coldDataCgCtrService;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java
new file mode 100644
index 0000000000..11fa0e707f
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public interface ColdCtrStrategy {
+ /**
+ * Calculate the determining factor about whether to accelerate or
decelerate
+ * @return
+ */
+ Double decisionFactor();
+ /**
+ * Promote the speed for consumerGroup to read cold data
+ * @param consumerGroup
+ * @param currentThreshold
+ */
+ void promote(String consumerGroup, Long currentThreshold);
+ /**
+ * Decelerate the speed for consumerGroup to read cold data
+ * @param consumerGroup
+ * @param currentThreshold
+ */
+ void decelerate(String consumerGroup, Long currentThreshold);
+ /**
+ * Collect the total number of cold read data in the system
+ * @param globalAcc
+ */
+ void collect(Long globalAcc);
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
new file mode 100644
index 0000000000..dd9278fb75
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.fastjson.JSONObject;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.coldctr.AccAndTimeStamp;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+/**
+ * store the cg cold read ctr table and acc the size of the cold
+ * reading msg, timing to clear the table and set acc to zero
+ */
+public class ColdDataCgCtrService extends ServiceThread {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_COLDCTR_LOGGER_NAME);
+ private final SystemClock systemClock = new SystemClock();
+ private final long cgColdAccResideTimeoutMills = 60 * 1000;
+ private static final AtomicLong GLOBAL_ACC = new AtomicLong(0L);
+ private static final String ADAPTIVE = "||adaptive";
+ /**
+ * as soon as the consumerGroup read the cold data then it will be put
into @code cgColdThresholdMapRuntime,
+ * and it also will be removed when does not read cold data in @code
cgColdAccResideTimeoutMills later;
+ */
+ private final ConcurrentHashMap<String, AccAndTimeStamp>
cgColdThresholdMapRuntime = new ConcurrentHashMap<>();
+ /**
+ * if the system admin wants to set the special cold read threshold for
some consumerGroup, the configuration will
+ * be putted into @code cgColdThresholdMapConfig
+ */
+ private final ConcurrentHashMap<String, Long> cgColdThresholdMapConfig =
new ConcurrentHashMap<>();
+ private final BrokerConfig brokerConfig;
+ private final MessageStoreConfig messageStoreConfig;
+ private final ColdCtrStrategy coldCtrStrategy;
+
+ public ColdDataCgCtrService(BrokerController brokerController) {
+ this.brokerConfig = brokerController.getBrokerConfig();
+ this.messageStoreConfig = brokerController.getMessageStoreConfig();
+ this.coldCtrStrategy = brokerConfig.isUsePIDColdCtrStrategy() ? new
PIDAdaptiveColdCtrStrategy(this,
(long)(brokerConfig.getGlobalColdReadThreshold() * 0.8)) : new
SimpleColdCtrStrategy(this);
+ }
+
+ @Override
+ public String getServiceName() {
+ return ColdDataCgCtrService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ if (messageStoreConfig.isColdDataFlowControlEnable()) {
+ this.waitForRunning(5 * 1000);
+ } else {
+ this.waitForRunning(180 * 1000);
+ }
+ long beginLockTimestamp = this.systemClock.now();
+ clearDataAcc();
+ if (!brokerConfig.isColdCtrStrategyEnable()) {
+ clearAdaptiveConfig();
+ }
+ long costTime = this.systemClock.now() - beginLockTimestamp;
+ log.info("[{}] clearTheDataAcc-cost {} ms.", costTime > 3 *
1000 ? "NOTIFYME" : "OK", costTime);
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has exception", e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ public String getColdDataFlowCtrInfo() {
+ JSONObject result = new JSONObject();
+ result.put("runtimeTable", this.cgColdThresholdMapRuntime);
+ result.put("configTable", this.cgColdThresholdMapConfig);
+ result.put("cgColdReadThreshold",
this.brokerConfig.getCgColdReadThreshold());
+ result.put("globalColdReadThreshold",
this.brokerConfig.getGlobalColdReadThreshold());
+ result.put("globalAcc", GLOBAL_ACC.get());
+ return result.toJSONString();
+ }
+
+ /**
+ * clear the long time no cold read cg in the table;
+ * update the acc to zero for the cg in the table;
+ * use the strategy to promote or decelerate the cg;
+ */
+ private void clearDataAcc() {
+ log.info("clearDataAcc cgColdThresholdMapRuntime key size: {}",
cgColdThresholdMapRuntime.size());
+ if (brokerConfig.isColdCtrStrategyEnable()) {
+ coldCtrStrategy.collect(GLOBAL_ACC.get());
+ }
+ Iterator<Entry<String, AccAndTimeStamp>> iterator =
cgColdThresholdMapRuntime.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<String, AccAndTimeStamp> next = iterator.next();
+ if (System.currentTimeMillis() >= cgColdAccResideTimeoutMills +
next.getValue().getLastColdReadTimeMills()) {
+ if (brokerConfig.isColdCtrStrategyEnable()) {
+
cgColdThresholdMapConfig.remove(buildAdaptiveKey(next.getKey()));
+ }
+ iterator.remove();
+ } else if (next.getValue().getColdAcc().get() >=
getThresholdByConsumerGroup(next.getKey())) {
+ log.info("Coldctr consumerGroup: {}, acc: {}, threshold: {}",
next.getKey(), next.getValue().getColdAcc().get(),
getThresholdByConsumerGroup(next.getKey()));
+ if (brokerConfig.isColdCtrStrategyEnable() &&
!isGlobalColdCtr() && !isAdminConfig(next.getKey())) {
+ coldCtrStrategy.promote(buildAdaptiveKey(next.getKey()),
getThresholdByConsumerGroup(next.getKey()));
+ }
+ }
+ next.getValue().getColdAcc().set(0L);
+ }
+ if (isGlobalColdCtr()) {
+ log.info("Coldctr global acc: {}, threshold: {}",
GLOBAL_ACC.get(), this.brokerConfig.getGlobalColdReadThreshold());
+ }
+ if (brokerConfig.isColdCtrStrategyEnable()) {
+ sortAndDecelerate();
+ }
+ GLOBAL_ACC.set(0L);
+ }
+
+ private void sortAndDecelerate() {
+ List<Entry<String, Long>> configMapList = new ArrayList<Entry<String,
Long>>(cgColdThresholdMapConfig.entrySet());
+ configMapList.sort(new Comparator<Entry<String, Long>>() {
+ @Override
+ public int compare(Entry<String, Long> o1, Entry<String, Long> o2)
{
+ return (int)(o2.getValue() - o1.getValue());
+ }
+ });
+ Iterator<Entry<String, Long>> iterator = configMapList.iterator();
+ int maxDecelerate = 3;
+ while (iterator.hasNext() && maxDecelerate > 0) {
+ Entry<String, Long> next = iterator.next();
+ if (!isAdminConfig(next.getKey())) {
+ coldCtrStrategy.decelerate(next.getKey(),
getThresholdByConsumerGroup(next.getKey()));
+ maxDecelerate --;
+ }
+ }
+ }
+
+ public void coldAcc(String consumerGroup, long coldDataToAcc) {
+ if (coldDataToAcc <= 0) {
+ return;
+ }
+ GLOBAL_ACC.addAndGet(coldDataToAcc);
+ AccAndTimeStamp atomicAcc =
cgColdThresholdMapRuntime.get(consumerGroup);
+ if (null == atomicAcc) {
+ atomicAcc = new AccAndTimeStamp(new AtomicLong(coldDataToAcc));
+ atomicAcc = cgColdThresholdMapRuntime.putIfAbsent(consumerGroup,
atomicAcc);
+ }
+ if (null != atomicAcc) {
+ atomicAcc.getColdAcc().addAndGet(coldDataToAcc);
+ atomicAcc.setLastColdReadTimeMills(System.currentTimeMillis());
+ }
+ }
+
+ public void addOrUpdateGroupConfig(String consumerGroup, Long threshold) {
+ cgColdThresholdMapConfig.put(consumerGroup, threshold);
+ }
+
+ public void removeGroupConfig(String consumerGroup) {
+ cgColdThresholdMapConfig.remove(consumerGroup);
+ }
+
+ public boolean isCgNeedColdDataFlowCtr(String consumerGroup) {
+ if (!this.messageStoreConfig.isColdDataFlowControlEnable()) {
+ return false;
+ }
+ if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) {
+ return false;
+ }
+ AccAndTimeStamp accAndTimeStamp =
cgColdThresholdMapRuntime.get(consumerGroup);
+ if (null == accAndTimeStamp) {
+ return false;
+ }
+
+ Long threshold = getThresholdByConsumerGroup(consumerGroup);
+ if (accAndTimeStamp.getColdAcc().get() >= threshold) {
+ return true;
+ }
+ return GLOBAL_ACC.get() >=
this.brokerConfig.getGlobalColdReadThreshold();
+ }
+
+ public boolean isGlobalColdCtr() {
+ return GLOBAL_ACC.get() >
this.brokerConfig.getGlobalColdReadThreshold();
+ }
+
+ public BrokerConfig getBrokerConfig() {
+ return brokerConfig;
+ }
+
+ private Long getThresholdByConsumerGroup(String consumerGroup) {
+ if (isAdminConfig(consumerGroup)) {
+ if (consumerGroup.endsWith(ADAPTIVE)) {
+ return
cgColdThresholdMapConfig.get(consumerGroup.split(ADAPTIVE)[0]);
+ }
+ return cgColdThresholdMapConfig.get(consumerGroup);
+ }
+ Long threshold = null;
+ if (brokerConfig.isColdCtrStrategyEnable()) {
+ if (consumerGroup.endsWith(ADAPTIVE)) {
+ threshold = cgColdThresholdMapConfig.get(consumerGroup);
+ } else {
+ threshold =
cgColdThresholdMapConfig.get(buildAdaptiveKey(consumerGroup));
+ }
+ }
+ if (null == threshold) {
+ threshold = this.brokerConfig.getCgColdReadThreshold();
+ }
+ return threshold;
+ }
+
+ private String buildAdaptiveKey(String consumerGroup) {
+ return consumerGroup + ADAPTIVE;
+ }
+
+ private boolean isAdminConfig(String consumerGroup) {
+ if (consumerGroup.endsWith(ADAPTIVE)) {
+ consumerGroup = consumerGroup.split(ADAPTIVE)[0];
+ }
+ return cgColdThresholdMapConfig.containsKey(consumerGroup);
+ }
+
+ private void clearAdaptiveConfig() {
+ cgColdThresholdMapConfig.entrySet().removeIf(next ->
next.getKey().endsWith(ADAPTIVE));
+ }
+
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
new file mode 100644
index 0000000000..c38d886fd3
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+/**
+ * just requests are type of pull have the qualification to be put into this
hold queue.
+ * if the pull request is reading cold data and that request will be cold at
the first time,
+ * then the pull request will be cold in this @code
pullRequestLinkedBlockingQueue,
+ * in @code coldTimeoutMillis later the pull request will be warm and marked
holded
+ */
+public class ColdDataPullRequestHoldService extends ServiceThread {
+
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_COLDCTR_LOGGER_NAME);
+ public static final String NO_SUSPEND_KEY = "_noSuspend_";
+
+ private final long coldHoldTimeoutMillis = 3000;
+ private final SystemClock systemClock = new SystemClock();
+ private final BrokerController brokerController;
+ private final LinkedBlockingQueue<PullRequest> pullRequestColdHoldQueue =
new LinkedBlockingQueue<>(10000);
+
+ public void suspendColdDataReadRequest(PullRequest pullRequest) {
+ if
(this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ pullRequestColdHoldQueue.offer(pullRequest);
+ }
+ }
+
+ public ColdDataPullRequestHoldService(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public String getServiceName() {
+ return ColdDataPullRequestHoldService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ if
(!this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ this.waitForRunning(20 * 1000);
+ } else {
+ this.waitForRunning(5 * 1000);
+ }
+ long beginClockTimestamp = this.systemClock.now();
+ this.checkColdDataPullRequest();
+ long costTime = this.systemClock.now() - beginClockTimestamp;
+ log.info("[{}] checkColdDataPullRequest-cost {} ms.", costTime
> 5 * 1000 ? "NOTIFYME" : "OK", costTime);
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has exception", e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ private void checkColdDataPullRequest() {
+ int succTotal = 0, errorTotal = 0, queueSize =
pullRequestColdHoldQueue.size() ;
+ Iterator<PullRequest> iterator = pullRequestColdHoldQueue.iterator();
+ while (iterator.hasNext()) {
+ PullRequest pullRequest = iterator.next();
+ if (System.currentTimeMillis() >=
pullRequest.getSuspendTimestamp() + coldHoldTimeoutMillis) {
+ try {
+
pullRequest.getRequestCommand().addExtField(NO_SUSPEND_KEY, "1");
+
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
+ pullRequest.getClientChannel(),
pullRequest.getRequestCommand());
+ succTotal++;
+ } catch (Exception e) {
+ log.error("PullRequestColdHoldService
checkColdDataPullRequest error", e);
+ errorTotal++;
+ }
+ //remove the timeout request from the iterator
+ iterator.remove();
+ }
+ }
+ log.info("checkColdPullRequest-info-finish, queueSize: {}
successTotal: {} errorTotal: {}",
+ queueSize, succTotal, errorTotal);
+ }
+
+}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
new file mode 100644
index 0000000000..87d9789f71
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class PIDAdaptiveColdCtrStrategy implements ColdCtrStrategy {
+ /**
+ * Stores the maximum number of recent et val
+ */
+ private static final int MAX_STORE_NUMS = 10;
+ /**
+ * The weights of the three modules of the PID formula
+ */
+ private static final Double KP = 0.5, KI = 0.3, KD = 0.2;
+ private final List<Long> historyEtValList = new ArrayList<>();
+ private final ColdDataCgCtrService coldDataCgCtrService;
+ private final Long expectGlobalVal;
+ private long et = 0L;
+
+ public PIDAdaptiveColdCtrStrategy(ColdDataCgCtrService
coldDataCgCtrService, Long expectGlobalVal) {
+ this.coldDataCgCtrService = coldDataCgCtrService;
+ this.expectGlobalVal = expectGlobalVal;
+ }
+
+ @Override
+ public Double decisionFactor() {
+ if (historyEtValList.size() < MAX_STORE_NUMS) {
+ return 0.0;
+ }
+ Long et1 = historyEtValList.get(historyEtValList.size() - 1);
+ Long et2 = historyEtValList.get(historyEtValList.size() - 2);
+ Long differential = et1 - et2;
+ Double integration = 0.0;
+ for (Long item: historyEtValList) {
+ integration += item;
+ }
+ return KP * et + KI * integration + KD * differential;
+ }
+
+ @Override
+ public void promote(String consumerGroup, Long currentThreshold) {
+ if (decisionFactor() > 0) {
+ coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup,
(long)(currentThreshold * 1.5));
+ }
+ }
+
+ @Override
+ public void decelerate(String consumerGroup, Long currentThreshold) {
+ if (decisionFactor() < 0) {
+ long changedThresholdVal = (long)(currentThreshold * 0.8);
+ if (changedThresholdVal <
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold()) {
+ changedThresholdVal =
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold();
+ }
+ coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup,
changedThresholdVal);
+ }
+ }
+
+ @Override
+ public void collect(Long globalAcc) {
+ et = expectGlobalVal - globalAcc;
+ historyEtValList.add(et);
+ Iterator<Long> iterator = historyEtValList.iterator();
+ while (historyEtValList.size() > MAX_STORE_NUMS && iterator.hasNext())
{
+ iterator.next();
+ iterator.remove();
+ }
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
new file mode 100644
index 0000000000..f26a242f9a
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/SimpleColdCtrStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public class SimpleColdCtrStrategy implements ColdCtrStrategy {
+ private final ColdDataCgCtrService coldDataCgCtrService;
+
+ public SimpleColdCtrStrategy(ColdDataCgCtrService coldDataCgCtrService) {
+ this.coldDataCgCtrService = coldDataCgCtrService;
+ }
+
+ @Override
+ public Double decisionFactor() {
+ return null;
+ }
+
+ @Override
+ public void promote(String consumerGroup, Long currentThreshold) {
+ coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup,
(long)(currentThreshold * 1.5));
+ }
+
+ @Override
+ public void decelerate(String consumerGroup, Long currentThreshold) {
+ if (!coldDataCgCtrService.isGlobalColdCtr()) {
+ return;
+ }
+ long changedThresholdVal = (long)(currentThreshold * 0.8);
+ if (changedThresholdVal <
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold()) {
+ changedThresholdVal =
coldDataCgCtrService.getBrokerConfig().getCgColdReadThreshold();
+ }
+ coldDataCgCtrService.addOrUpdateGroupConfig(consumerGroup,
changedThresholdVal);
+ }
+
+ @Override
+ public void collect(Long globalAcc) {
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0a05239e7f..892a713308 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -60,6 +60,7 @@ import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -186,6 +187,7 @@ import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.util.LibC;
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -215,6 +217,14 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return this.updateBrokerConfig(ctx, request);
case RequestCode.GET_BROKER_CONFIG:
return this.getBrokerConfig(ctx, request);
+ case RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG:
+ return this.updateColdDataFlowCtrGroupConfig(ctx, request);
+ case RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG:
+ return this.removeColdDataFlowCtrGroupConfig(ctx, request);
+ case RequestCode.GET_COLD_DATA_FLOW_CTR_INFO:
+ return this.getColdDataFlowCtrInfo(ctx);
+ case RequestCode.SET_COMMITLOG_READ_MODE:
+ return this.setCommitLogReadaheadMode(ctx, request);
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
return this.searchOffsetByTimestamp(ctx, request);
case RequestCode.GET_MAX_OFFSET:
@@ -759,6 +769,131 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
+ private synchronized RemotingCommand
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand
request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ byte[] body = request.getBody();
+ if (body != null) {
+ try {
+ String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+ Properties properties = MixAll.string2Properties(bodyStr);
+ if (properties != null) {
+ LOGGER.info("updateColdDataFlowCtrGroupConfig new config:
{}, client: {}", properties, ctx.channel().remoteAddress());
+ properties.entrySet().stream().forEach(i -> {
+ try {
+ String consumerGroup = String.valueOf(i.getKey());
+ Long threshold =
Long.valueOf(String.valueOf(i.getValue()));
+
this.brokerController.getColdDataCgCtrService().addOrUpdateGroupConfig(consumerGroup,
threshold);
+ } catch (Exception e) {
+ LOGGER.error("updateColdDataFlowCtrGroupConfig
properties on entry error, key: {}, val: {}", i.getKey(), i.getValue(), e);
+ }
+ });
+ } else {
+ LOGGER.error("updateColdDataFlowCtrGroupConfig
string2Properties error");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string2Properties error");
+ return response;
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.error("updateColdDataFlowCtrGroupConfig
UnsupportedEncodingException", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private synchronized RemotingCommand
removeColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ LOGGER.info("removeColdDataFlowCtrGroupConfig called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ byte[] body = request.getBody();
+ if (body != null) {
+ try {
+ String consumerGroup = new String(body,
MixAll.DEFAULT_CHARSET);
+ if (consumerGroup != null) {
+ LOGGER.info("removeColdDataFlowCtrGroupConfig,
consumerGroup: {} client: {}", consumerGroup, ctx.channel().remoteAddress());
+
this.brokerController.getColdDataCgCtrService().removeGroupConfig(consumerGroup);
+ } else {
+ LOGGER.error("removeColdDataFlowCtrGroupConfig string
parse error");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string parse error");
+ return response;
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.error("removeColdDataFlowCtrGroupConfig
UnsupportedEncodingException", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getColdDataFlowCtrInfo(ChannelHandlerContext ctx) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ LOGGER.info("getColdDataFlowCtrInfo called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ String content =
this.brokerController.getColdDataCgCtrService().getColdDataFlowCtrInfo();
+ if (content != null) {
+ try {
+ response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ LOGGER.error("getColdDataFlowCtrInfo
UnsupportedEncodingException", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand setCommitLogReadaheadMode(ChannelHandlerContext
ctx, RemotingCommand request) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
+ LOGGER.info("setCommitLogReadaheadMode called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ try {
+ HashMap<String, String> extFields = request.getExtFields();
+ if (null == extFields) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("set commitlog readahead mode param error");
+ return response;
+ }
+ int mode =
Integer.parseInt(extFields.get(FIleReadaheadMode.READ_AHEAD_MODE));
+ if (mode != LibC.MADV_RANDOM && mode != LibC.MADV_NORMAL) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("set commitlog readahead mode param value
error");
+ return response;
+ }
+ MessageStore messageStore =
this.brokerController.getMessageStore();
+ if (messageStore instanceof DefaultMessageStore) {
+ DefaultMessageStore defaultMessageStore =
(DefaultMessageStore)messageStore;
+ if (mode == LibC.MADV_NORMAL) {
+
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
+ } else {
+
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(false);
+ }
+
defaultMessageStore.getCommitLog().scanFileAndSetReadMode(mode);
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("set commitlog readahead mode success, mode: "
+ mode);
+ } catch (Exception e) {
+ LOGGER.error("set commitlog readahead mode failed", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("set commitlog readahead mode failed");
+ }
+ return response;
+ }
+
private synchronized RemotingCommand
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 8df2265c2c..b2794b1289 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -24,10 +24,12 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.coldctr.ColdDataPullRequestHoldService;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
@@ -65,6 +67,7 @@ import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfi
import org.apache.rocketmq.remoting.rpc.RpcClientUtils;
import org.apache.rocketmq.remoting.rpc.RpcRequest;
import org.apache.rocketmq.remoting.rpc.RpcResponse;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
@@ -283,7 +286,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
- return this.processRequest(ctx.channel(), request, true);
+ return this.processRequest(ctx.channel(), request, true, true);
}
@Override
@@ -295,7 +298,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
return false;
}
- private RemotingCommand processRequest(final Channel channel,
RemotingCommand request, boolean brokerAllowSuspend)
+ private RemotingCommand processRequest(final Channel channel,
RemotingCommand request, boolean brokerAllowSuspend, boolean
brokerAllowFlowCtrSuspend)
throws RemotingCommandException {
RemotingCommand response =
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
@@ -484,6 +487,32 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
}
final MessageStore messageStore = brokerController.getMessageStore();
+ if (this.brokerController.getMessageStore() instanceof
DefaultMessageStore) {
+ DefaultMessageStore defaultMessageStore =
(DefaultMessageStore)this.brokerController.getMessageStore();
+ boolean cgNeedColdDataFlowCtr =
brokerController.getColdDataCgCtrService().isCgNeedColdDataFlowCtr(requestHeader.getConsumerGroup());
+ if (cgNeedColdDataFlowCtr) {
+ boolean isMsgLogicCold = defaultMessageStore.getCommitLog()
+
.getColdDataCheckService().isMsgInColdArea(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getQueueOffset());
+ if (isMsgLogicCold) {
+ ConsumeType consumeType =
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()).getConsumeType();
+ if (consumeType == ConsumeType.CONSUME_PASSIVELY) {
+ response.setCode(ResponseCode.SYSTEM_BUSY);
+ response.setRemark("This consumer group is reading
cold data. It has been flow control");
+ return response;
+ } else if (consumeType == ConsumeType.CONSUME_ACTIVELY) {
+ if (brokerAllowFlowCtrSuspend) { // second arrived,
which will not be held
+ PullRequest pullRequest = new PullRequest(request,
channel, 1000,
+ this.brokerController.getMessageStore().now(),
requestHeader.getQueueOffset(), subscriptionData, messageFilter);
+
this.brokerController.getColdDataPullRequestHoldService().suspendColdDataReadRequest(pullRequest);
+ return null;
+ }
+ requestHeader.setMaxMsgNums(1);
+ }
+ }
+ }
+ }
+
final boolean useResetOffsetFeature =
brokerController.getBrokerConfig().isUseServerSideResetOffset();
String topic = requestHeader.getTopic();
String group = requestHeader.getConsumerGroup();
@@ -515,7 +544,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
finalResponse.setRemark("store getMessage return
null");
return finalResponse;
}
-
+
brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(),
result.getColdDataSum());
return pullMessageResultHandler.handle(
result,
request,
@@ -746,7 +775,8 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
public void executeRequestWhenWakeup(final Channel channel, final
RemotingCommand request) {
Runnable run = () -> {
try {
- final RemotingCommand response =
PullMessageProcessor.this.processRequest(channel, request, false);
+ boolean brokerAllowFlowCtrSuspend = !(request.getExtFields()
!= null &&
request.getExtFields().containsKey(ColdDataPullRequestHoldService.NO_SUSPEND_KEY));
+ final RemotingCommand response =
PullMessageProcessor.this.processRequest(channel, request, false,
brokerAllowFlowCtrSuspend);
if (response != null) {
response.setOpaque(request.getOpaque());
diff --git a/broker/src/main/resources/rmq.broker.logback.xml
b/broker/src/main/resources/rmq.broker.logback.xml
index 7902c0526a..78b1aea411 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -498,6 +498,34 @@
<appender-ref ref="RocketmqPopSiftingAppender_inner"/>
</appender>
+ <appender name="RocketmqColdCtrSiftingAppender"
class="ch.qos.logback.classic.sift.SiftingAppender">
+ <discriminator>
+ <key>brokerContainerLogDir</key>
+ <defaultValue>${file.separator}</defaultValue>
+ </discriminator>
+ <sift>
+ <appender name="RocketmqColdCtrAppender"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/coldctr.log</file>
+ <append>true</append>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/coldctr.%i.log
+ </fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
+
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t -
%m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+ </sift>
+ </appender>
+
<appender name="RocketmqBrokerMetricsAppender"
class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator>
<key>brokerContainerLogDir</key>
@@ -595,6 +623,11 @@
<appender-ref ref="RocketmqPopSiftingAppender"/>
</logger>
+ <logger name="RocketmqColdCtr" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="RocketmqColdCtrSiftingAppender"/>
+ </logger>
+
<logger name="RocketmqTraffic" additivity="false" level="INFO">
<appender-ref ref="RocketmqTrafficSiftingAppender"/>
</logger>
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 2c7a988ee4..995362bb77 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.AttributeParser;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -1748,6 +1749,82 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
throw new MQBrokerException(response.getCode(), response.getRemark(),
addr);
}
+ public void updateColdDataFlowCtrGroupConfig(final String addr, final
Properties properties, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException,
UnsupportedEncodingException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG,
null);
+ String str = MixAll.properties2String(properties);
+ if (str != null && str.length() > 0) {
+ request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
+ RemotingCommand response = this.remotingClient.invokeSync(
+
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(),
response.getRemark());
+ }
+ }
+
+ public void removeColdDataFlowCtrGroupConfig(final String addr, final
String consumerGroup, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException,
UnsupportedEncodingException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG,
null);
+ if (consumerGroup != null && consumerGroup.length() > 0) {
+ request.setBody(consumerGroup.getBytes(MixAll.DEFAULT_CHARSET));
+ RemotingCommand response = this.remotingClient.invokeSync(
+
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(),
response.getRemark());
+ }
+ }
+
+ public String getColdDataFlowCtrInfo(final String addr, final long
timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException,
UnsupportedEncodingException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_COLD_DATA_FLOW_CTR_INFO,
null);
+ RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (null != response.getBody() && response.getBody().length >
0) {
+ return new String(response.getBody(),
MixAll.DEFAULT_CHARSET);
+ }
+ return null;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ public String setCommitLogReadAheadMode(final String addr, final String
mode, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SET_COMMITLOG_READ_MODE, null);
+ HashMap<String, String> extFields = new HashMap<>();
+ extFields.put(FIleReadaheadMode.READ_AHEAD_MODE, mode);
+ request.setExtFields(extFields);
+ RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (null != response.getRemark() &&
response.getRemark().length() > 0) {
+ return response.getRemark();
+ }
+ return null;
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
public ClusterInfo getBrokerClusterInfo(
final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e9fad05e51..47ce2cb8d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -370,6 +370,10 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean estimateAccumulation = true;
+ private boolean coldCtrStrategyEnable = false;
+ private boolean usePIDColdCtrStrategy = true;
+ private long cgColdReadThreshold = 3 * 1024 * 1024;
+ private long globalColdReadThreshold = 100 * 1024 * 1024;
public long getMaxPopPollingSize() {
return maxPopPollingSize;
@@ -1619,6 +1623,38 @@ public class BrokerConfig extends BrokerIdentity {
this.estimateAccumulation = estimateAccumulation;
}
+ public boolean isColdCtrStrategyEnable() {
+ return coldCtrStrategyEnable;
+ }
+
+ public void setColdCtrStrategyEnable(boolean coldCtrStrategyEnable) {
+ this.coldCtrStrategyEnable = coldCtrStrategyEnable;
+ }
+
+ public boolean isUsePIDColdCtrStrategy() {
+ return usePIDColdCtrStrategy;
+ }
+
+ public void setUsePIDColdCtrStrategy(boolean usePIDColdCtrStrategy) {
+ this.usePIDColdCtrStrategy = usePIDColdCtrStrategy;
+ }
+
+ public long getCgColdReadThreshold() {
+ return cgColdReadThreshold;
+ }
+
+ public void setCgColdReadThreshold(long cgColdReadThreshold) {
+ this.cgColdReadThreshold = cgColdReadThreshold;
+ }
+
+ public long getGlobalColdReadThreshold() {
+ return globalColdReadThreshold;
+ }
+
+ public void setGlobalColdReadThreshold(long globalColdReadThreshold) {
+ this.globalColdReadThreshold = globalColdReadThreshold;
+ }
+
public boolean isUseStaticSubscription() {
return useStaticSubscription;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 3d6f5d68d0..dc1d69fe10 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -498,4 +498,21 @@ public class MixAll {
return path.normalize().toString();
}
+ public static boolean isSysConsumerGroupForNoColdReadLimit(String
consumerGroup) {
+ if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup)
+ || TOOLS_CONSUMER_GROUP.equals(consumerGroup)
+ || SCHEDULE_CONSUMER_GROUP.equals(consumerGroup)
+ || FILTERSRV_CONSUMER_GROUP.equals(consumerGroup)
+ || MONITOR_CONSUMER_GROUP.equals(consumerGroup)
+ || SELF_TEST_CONSUMER_GROUP.equals(consumerGroup)
+ || ONS_HTTP_PROXY_GROUP.equals(consumerGroup)
+ || CID_ONSAPI_PERMISSION_GROUP.equals(consumerGroup)
+ || CID_ONSAPI_OWNER_GROUP.equals(consumerGroup)
+ || CID_ONSAPI_PULL_GROUP.equals(consumerGroup)
+ || CID_SYS_RMQ_TRANS.equals(consumerGroup)
+ || consumerGroup.startsWith(CID_RMQ_SYS_PREFIX)) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java
b/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java
new file mode 100644
index 0000000000..212bc08c48
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/coldctr/AccAndTimeStamp.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.coldctr;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AccAndTimeStamp {
+
+ public AtomicLong coldAcc = new AtomicLong(0L);
+ public Long lastColdReadTimeMills = System.currentTimeMillis();
+ public Long createTimeMills = System.currentTimeMillis();
+
+ public AccAndTimeStamp(AtomicLong coldAcc) {
+ this.coldAcc = coldAcc;
+ }
+
+ public AtomicLong getColdAcc() {
+ return coldAcc;
+ }
+
+ public void setColdAcc(AtomicLong coldAcc) {
+ this.coldAcc = coldAcc;
+ }
+
+ public Long getLastColdReadTimeMills() {
+ return lastColdReadTimeMills;
+ }
+
+ public void setLastColdReadTimeMills(Long lastColdReadTimeMills) {
+ this.lastColdReadTimeMills = lastColdReadTimeMills;
+ }
+
+ public Long getCreateTimeMills() {
+ return createTimeMills;
+ }
+
+ public void setCreateTimeMills(Long createTimeMills) {
+ this.createTimeMills = createTimeMills;
+ }
+
+ @Override
+ public String toString() {
+ return "AccAndTimeStamp{" +
+ "coldAcc=" + coldAcc +
+ ", lastColdReadTimeMills=" + lastColdReadTimeMills +
+ ", createTimeMills=" + createTimeMills +
+ '}';
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
b/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
new file mode 100644
index 0000000000..e23a5f5878
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/constant/FIleReadaheadMode.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+public class FIleReadaheadMode {
+ public static final String READ_AHEAD_MODE = "READ_AHEAD_MODE";
+}
\ No newline at end of file
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index a871fc5272..c1176ea153 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -50,4 +50,5 @@ public class LoggerName {
public static final String STDOUT_LOGGER_NAME = "STDOUT";
public static final String PROXY_LOGGER_NAME = "RocketmqProxy";
public static final String PROXY_WATER_MARK_LOGGER_NAME =
"RocketmqProxyWatermark";
+ public static final String ROCKETMQ_COLDCTR_LOGGER_NAME =
"RocketmqColdCtr";
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index 9f9a64ed0e..ec87039b41 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -275,6 +275,11 @@ public class RequestCode {
*/
public static final int CLEAN_BROKER_DATA = 1011;
+ public static final int UPDATE_COLD_DATA_FLOW_CTR_CONFIG = 2001;
+ public static final int REMOVE_COLD_DATA_FLOW_CTR_CONFIG = 2002;
+ public static final int GET_COLD_DATA_FLOW_CTR_INFO = 2003;
+ public static final int SET_COMMITLOG_READ_MODE = 2004;
+
public static final int CONTROLLER_GET_NEXT_BROKER_ID = 1012;
public static final int CONTROLLER_APPLY_BROKER_ID = 1013;
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 56f19529d1..5a5c90c5a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -27,12 +27,17 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
@@ -54,6 +59,8 @@ import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.util.LibC;
+import sun.nio.ch.DirectBuffer;
/**
* Store all metadata downtime for recovery, data protection reliability
@@ -68,6 +75,7 @@ public class CommitLog implements Swappable {
protected final DefaultMessageStore defaultMessageStore;
private final FlushManager flushManager;
+ private final ColdDataCheckService coldDataCheckService;
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
@@ -101,6 +109,7 @@ public class CommitLog implements Swappable {
this.defaultMessageStore = messageStore;
this.flushManager = new DefaultFlushManager();
+ this.coldDataCheckService = new ColdDataCheckService();
this.appendMessageCallback = new DefaultAppendMessageCallback();
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@@ -136,6 +145,9 @@ public class CommitLog implements Swappable {
public boolean load() {
boolean result = this.mappedFileQueue.load();
+ if (result &&
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable()) {
+ scanFileAndSetReadMode(LibC.MADV_RANDOM);
+ }
this.mappedFileQueue.checkSelf();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
@@ -146,12 +158,18 @@ public class CommitLog implements Swappable {
log.info("start commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.setDaemon(true);
flushDiskWatcher.start();
+ if (this.coldDataCheckService != null) {
+ this.coldDataCheckService.start();
+ }
}
public void shutdown() {
this.flushManager.shutdown();
log.info("shutdown commitLog successfully. storeRoot: {}",
this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
flushDiskWatcher.shutdown(true);
+ if (this.coldDataCheckService != null) {
+ this.coldDataCheckService.shutdown();
+ }
}
public long flush() {
@@ -901,6 +919,9 @@ public class CommitLog implements Swappable {
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); //
Mark: NewFile may be cause noise
+ if (isCloseReadAhead()) {
+ setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+ }
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " +
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
@@ -924,6 +945,9 @@ public class CommitLog implements Swappable {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
+ if (isCloseReadAhead()) {
+ setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+ }
result = mappedFile.appendMessage(msg,
this.appendMessageCallback, putMessageContext);
if
(AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
onCommitLogAppend(msg, result, mappedFile);
@@ -1060,6 +1084,9 @@ public class CommitLog implements Swappable {
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); //
Mark: NewFile may be cause noise
+ if (isCloseReadAhead()) {
+ setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+ }
}
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {}
clientAddr: {}", messageExtBatch.getTopic(),
messageExtBatch.getBornHostString());
@@ -1081,6 +1108,9 @@ public class CommitLog implements Swappable {
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
+ if (isCloseReadAhead()) {
+ setFileReadMode(mappedFile, LibC.MADV_RANDOM);
+ }
result = mappedFile.appendMessages(messageExtBatch,
this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
@@ -1236,7 +1266,11 @@ public class CommitLog implements Swappable {
MappedFile mappedFile =
this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
- return mappedFile.selectMappedBuffer(pos, size);
+ SelectMappedBufferResult selectMappedBufferResult =
mappedFile.selectMappedBuffer(pos, size);
+ if (null != selectMappedBufferResult) {
+
selectMappedBufferResult.setInCache(coldDataCheckService.isDataInPageCache(offset));
+ return selectMappedBufferResult;
+ }
}
return null;
}
@@ -2018,4 +2052,207 @@ public class CommitLog implements Swappable {
public FlushManager getFlushManager() {
return flushManager;
}
+
+ private boolean isCloseReadAhead() {
+ return !MixAll.isWindows() &&
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
+ }
+
+ public class ColdDataCheckService extends ServiceThread {
+ private final SystemClock systemClock = new SystemClock();
+ private final ConcurrentHashMap<String, byte[]> pageCacheMap = new
ConcurrentHashMap<>();
+ private int pageSize = -1;
+ private int sampleSteps = 32;
+
+ public ColdDataCheckService() {
+ sampleSteps =
defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+ if (sampleSteps <= 0) {
+ sampleSteps = 32;
+ }
+ initPageSize();
+ scanFilesInPageCache();
+ }
+
+ @Override
+ public String getServiceName() {
+ return ColdDataCheckService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ if (MixAll.isWindows() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ pageCacheMap.clear();
+ this.waitForRunning(180 * 1000);
+ continue;
+ } else {
+
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+ }
+ long beginClockTimestamp = this.systemClock.now();
+ scanFilesInPageCache();
+ long costTime = this.systemClock.now() -
beginClockTimestamp;
+ log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has e: {}", e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ public boolean isDataInPageCache(final long offset) {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ return true;
+ }
+ if (pageSize <= 0 || sampleSteps <= 0) {
+ return true;
+ }
+ if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset,
getMaxOffset())) {
+ return true;
+ }
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ return false;
+ }
+
+ MappedFile mappedFile =
mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+ if (null == mappedFile) {
+ return true;
+ }
+ byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+ if (null == bytes) {
+ return true;
+ }
+
+ int pos = (int)(offset %
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+ int realIndex = pos / pageSize / sampleSteps;
+ return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+ }
+
+ private void scanFilesInPageCache() {
+ if (MixAll.isWindows() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable() || pageSize
<= 0) {
+ return;
+ }
+ try {
+ log.info("pageCacheMap key size: {}", pageCacheMap.size());
+ clearExpireMappedFile();
+ mappedFileQueue.getMappedFiles().forEach(mappedFile -> {
+ byte[] pageCacheTable = checkFileInPageCache(mappedFile);
+ if (sampleSteps > 1) {
+ pageCacheTable = sampling(pageCacheTable, sampleSteps);
+ }
+ pageCacheMap.put(mappedFile.getFileName(), pageCacheTable);
+ });
+ } catch (Exception e) {
+ log.error("scanFilesInPageCache exception", e);
+ }
+ }
+
+ private void clearExpireMappedFile() {
+ Set<String> currentFileSet =
mappedFileQueue.getMappedFiles().stream().map(MappedFile::getFileName).collect(Collectors.toSet());
+ pageCacheMap.forEach((key, value) -> {
+ if (!currentFileSet.contains(key)) {
+ pageCacheMap.remove(key);
+ log.info("clearExpireMappedFile fileName: {}, has been
clear", key);
+ }
+ });
+ }
+
+ private byte[] sampling(byte[] pageCacheTable, int sampleStep) {
+ byte[] sample = new byte[(pageCacheTable.length + sampleStep - 1)
/ sampleStep];
+ for (int i = 0, j = 0; i < pageCacheTable.length && j <
sample.length; i += sampleStep) {
+ sample[j++] = pageCacheTable[i];
+ }
+ return sample;
+ }
+
+ private byte[] checkFileInPageCache(MappedFile mappedFile) {
+ long fileSize = mappedFile.getFileSize();
+ final long address =
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+ int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize;
+ byte[] pageCacheRst = new byte[pageNums];
+ int mincore = LibC.INSTANCE.mincore(new Pointer(address), new
NativeLong(fileSize), pageCacheRst);
+ if (mincore != 0) {
+ log.error("checkFileInPageCache call the LibC.INSTANCE.mincore
error, fileName: {}, fileSize: {}",
+ mappedFile.getFileName(), fileSize);
+ for (int i = 0; i < pageNums; i++) {
+ pageCacheRst[i] = 1;
+ }
+ }
+ return pageCacheRst;
+ }
+
+ private void initPageSize() {
+ if (pageSize < 0) {
+ try {
+ if (!MixAll.isWindows()) {
+ pageSize = LibC.INSTANCE.getpagesize();
+ } else {
+
defaultMessageStore.getMessageStoreConfig().setColdDataFlowControlEnable(false);
+ log.info("windows os, coldDataCheckEnable force
setting to be false");
+ }
+ log.info("initPageSize pageSize: {}", pageSize);
+ } catch (Exception e) {
+
defaultMessageStore.getMessageStoreConfig().setColdDataFlowControlEnable(false);
+ log.error("initPageSize error, coldDataCheckEnable force
setting to be false ", e);
+ }
+ }
+ }
+
+ /**
+ * this result is not high accurate.
+ */
+ public boolean isMsgInColdArea(String group, String topic, int
queueId, long offset) {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ return false;
+ }
+ try {
+ ConsumeQueue consumeQueue =
(ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId);
+ if (null == consumeQueue) {
+ return false;
+ }
+ SelectMappedBufferResult bufferConsumeQueue =
consumeQueue.getIndexBuffer(offset);
+ if (null == bufferConsumeQueue || null ==
bufferConsumeQueue.getByteBuffer()) {
+ return false;
+ }
+ long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+ return
defaultMessageStore.checkInColdAreaByCommitOffset(offsetPy, getMaxOffset());
+ } catch (Exception e) {
+ log.error("isMsgInColdArea group: {}, topic: {}, queueId: {},
offset: {}",
+ group, topic, queueId, offset, e);
+ }
+ return false;
+ }
+ }
+
+ public void scanFileAndSetReadMode(int mode) {
+ if (MixAll.isWindows()) {
+ log.info("windows os stop scanFileAndSetReadMode");
+ return;
+ }
+ try {
+ log.info("scanFileAndSetReadMode mode: {}", mode);
+ mappedFileQueue.getMappedFiles().forEach(mappedFile -> {
+ setFileReadMode(mappedFile, mode);
+ });
+ } catch (Exception e) {
+ log.error("scanFileAndSetReadMode exception", e);
+ }
+ }
+
+ private int setFileReadMode(MappedFile mappedFile, int mode) {
+ if (null == mappedFile) {
+ log.error("setFileReadMode mappedFile is null");
+ return -1;
+ }
+ final long address =
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+ int madvise = LibC.INSTANCE.madvise(new Pointer(address), new
NativeLong(mappedFile.getFileSize()), mode);
+ if (madvise != 0) {
+ log.error("setFileReadMode error fileName: {}, madvise: {},
mode:{}", mappedFile.getFileName(), madvise, mode);
+ }
+ return madvise;
+ }
+
+ public ColdDataCheckService getColdDataCheckService() {
+ return coldDataCheckService;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6b0516b04f..acc1610e08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -886,6 +886,10 @@ public class DefaultMessageStore implements MessageStore {
continue;
}
+ if
(messageStoreConfig.isColdDataFlowControlEnable() &&
!MixAll.isSysConsumerGroupForNoColdReadLimit(group) &&
!selectResult.isInCache()) {
+
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
+ }
+
if (messageFilter != null
&&
!messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(),
null)) {
if (getResult.getBufferTotalSize() == 0) {
@@ -1705,6 +1709,16 @@ public class DefaultMessageStore implements MessageStore
{
return offsetPy >= commitLog.getMinOffset();
}
+ /**
+ * The ratio val is estimated by the experiment and experience
+ * so that the result is not high accurate for different business
+ * @return
+ */
+ public boolean checkInColdAreaByCommitOffset(long offsetPy, long
maxOffsetPy) {
+ long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE *
(this.messageStoreConfig.getAccessMessageInMemoryHotRatio() / 100.0));
+ return (maxOffsetPy - offsetPy) > memory;
+ }
+
private boolean isTheBatchFull(int sizePy, int unitBatchNum, int
maxMsgNums, long maxMsgSize, int bufferTotal,
int messageTotal, boolean isInMem) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 724ffdd878..a7556dfb85 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -41,6 +41,8 @@ public class GetMessageResult {
private int msgCount4Commercial = 0;
private int commercialSizePerMsg = 4 * 1024;
+ private long coldDataSum = 0L;
+
public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0,
Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
@@ -167,6 +169,14 @@ public class GetMessageResult {
return messageQueueOffset;
}
+ public long getColdDataSum() {
+ return coldDataSum;
+ }
+
+ public void setColdDataSum(long coldDataSum) {
+ this.coldDataSum = coldDataSum;
+ }
+
@Override
public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" +
nextBeginOffset + ", minOffset="
diff --git
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 317f536058..5c38cfe92a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++
b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -29,6 +29,8 @@ public class SelectMappedBufferResult {
protected MappedFile mappedFile;
+ private boolean isInCache = true;
+
public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer,
int size, MappedFile mappedFile) {
this.startOffset = startOffset;
this.byteBuffer = byteBuffer;
@@ -74,4 +76,12 @@ public class SelectMappedBufferResult {
long pos = startOffset - mappedFile.getFileFromOffset();
return mappedFile.isLoaded(pos, size);
}
+
+ public boolean isInCache() {
+ return isInCache;
+ }
+
+ public void setInCache(boolean inCache) {
+ isInCache = inCache;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index f5ad70543c..099be93051 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -379,6 +379,12 @@ public class MessageStoreConfig {
*/
private int sampleCountThreshold = 5000;
+ private boolean coldDataFlowControlEnable = false;
+ private boolean coldDataScanEnable = false;
+ private boolean dataReadAheadEnable = false;
+ private int timerColdDataCheckIntervalMs = 60 * 1000;
+ private int sampleSteps = 32;
+ private int accessMessageInMemoryHotRatio = 26;
/**
* Build ConsumeQueue concurrently with multi-thread
*/
@@ -1641,6 +1647,54 @@ public class MessageStoreConfig {
this.sampleCountThreshold = sampleCountThreshold;
}
+ public boolean isColdDataFlowControlEnable() {
+ return coldDataFlowControlEnable;
+ }
+
+ public void setColdDataFlowControlEnable(boolean
coldDataFlowControlEnable) {
+ this.coldDataFlowControlEnable = coldDataFlowControlEnable;
+ }
+
+ public boolean isColdDataScanEnable() {
+ return coldDataScanEnable;
+ }
+
+ public void setColdDataScanEnable(boolean coldDataScanEnable) {
+ this.coldDataScanEnable = coldDataScanEnable;
+ }
+
+ public int getTimerColdDataCheckIntervalMs() {
+ return timerColdDataCheckIntervalMs;
+ }
+
+ public void setTimerColdDataCheckIntervalMs(int
timerColdDataCheckIntervalMs) {
+ this.timerColdDataCheckIntervalMs = timerColdDataCheckIntervalMs;
+ }
+
+ public int getSampleSteps() {
+ return sampleSteps;
+ }
+
+ public void setSampleSteps(int sampleSteps) {
+ this.sampleSteps = sampleSteps;
+ }
+
+ public int getAccessMessageInMemoryHotRatio() {
+ return accessMessageInMemoryHotRatio;
+ }
+
+ public void setAccessMessageInMemoryHotRatio(int
accessMessageInMemoryHotRatio) {
+ this.accessMessageInMemoryHotRatio = accessMessageInMemoryHotRatio;
+ }
+
+ public boolean isDataReadAheadEnable() {
+ return dataReadAheadEnable;
+ }
+
+ public void setDataReadAheadEnable(boolean dataReadAheadEnable) {
+ this.dataReadAheadEnable = dataReadAheadEnable;
+ }
+
public boolean isEnableBuildConsumeQueueConcurrently() {
return enableBuildConsumeQueueConcurrently;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
index 8eaa44ab46..4c8e7d4538 100644
--- a/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+++ b/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
@@ -25,6 +25,8 @@ import com.sun.jna.Pointer;
public interface LibC extends Library {
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt"
: "c", LibC.class);
+ int MADV_NORMAL = 0;
+ int MADV_RANDOM = 1;
int MADV_WILLNEED = 3;
int MADV_DONTNEED = 4;
@@ -50,4 +52,8 @@ public interface LibC extends Library {
int mlockall(int flags);
int msync(Pointer p, NativeLong length, int flags);
+
+ int mincore(Pointer p, NativeLong length, byte[] vec);
+
+ int getpagesize();
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 2ca7d667bb..dd9c6a9b45 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -844,4 +844,32 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws
RemotingException, InterruptedException, MQBrokerException {
this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr,
clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker);
}
+
+ @Override
+ public void updateColdDataFlowCtrGroupConfig(String brokerAddr, Properties
properties)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ defaultMQAdminExtImpl.updateColdDataFlowCtrGroupConfig(brokerAddr,
properties);
+ }
+
+ @Override
+ public void removeColdDataFlowCtrGroupConfig(String brokerAddr, String
consumerGroup)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ defaultMQAdminExtImpl.removeColdDataFlowCtrGroupConfig(brokerAddr,
consumerGroup);
+ }
+
+ @Override
+ public String getColdDataFlowCtrInfo(String brokerAddr)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.getColdDataFlowCtrInfo(brokerAddr);
+ }
+
+ @Override
+ public String setCommitLogReadAheadMode(String brokerAddr, String mode)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.setCommitLogReadAheadMode(brokerAddr,
mode);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 1d4b1bbfc7..c5c467bf00 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -1905,4 +1905,31 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
public MQClientInstance getMqClientInstance() {
return mqClientInstance;
}
+
+ @Override
+ public void updateColdDataFlowCtrGroupConfig(String brokerAddr, Properties
properties)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+
this.mqClientInstance.getMQClientAPIImpl().updateColdDataFlowCtrGroupConfig(brokerAddr,
properties, timeoutMillis);
+ }
+
+ @Override
+ public void removeColdDataFlowCtrGroupConfig(String brokerAddr, String
consumerGroup)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+
this.mqClientInstance.getMQClientAPIImpl().removeColdDataFlowCtrGroupConfig(brokerAddr,
consumerGroup, timeoutMillis);
+ }
+
+ @Override
+ public String getColdDataFlowCtrInfo(String brokerAddr)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException,
+ UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ return
this.mqClientInstance.getMQClientAPIImpl().getColdDataFlowCtrInfo(brokerAddr,
timeoutMillis);
+ }
+
+ @Override
+ public String setCommitLogReadAheadMode(String brokerAddr, String mode)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
+ return
this.mqClientInstance.getMQClientAPIImpl().setCommitLogReadAheadMode(brokerAddr,
mode, timeoutMillis);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 01f985496c..7dcfc4fa5e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -473,4 +473,16 @@ public interface MQAdminExt extends MQAdmin {
String brokerControllerIdsToClean,
boolean isCleanLivingBroker) throws RemotingException,
InterruptedException, MQBrokerException;
+ void updateColdDataFlowCtrGroupConfig(final String brokerAddr, final
Properties properties)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException,
MQBrokerException;
+
+ void removeColdDataFlowCtrGroupConfig(final String brokerAddr, final
String consumerGroup)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException,
MQBrokerException;
+
+ String getColdDataFlowCtrInfo(final String brokerAddr)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException,
MQBrokerException;
+
+ String setCommitLogReadAheadMode(final String brokerAddr, String mode)
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException,
MQBrokerException;
+
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index fd64d69c73..0c2618e91c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -36,12 +36,16 @@ import
org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import
org.apache.rocketmq.tools.command.broker.CommitLogSetReadAheadSubCommand;
import
org.apache.rocketmq.tools.command.broker.DeleteExpiredCommitLogSubCommand;
import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
import org.apache.rocketmq.tools.command.broker.GetBrokerEpochSubCommand;
+import
org.apache.rocketmq.tools.command.broker.GetColdDataFlowCtrInfoSubCommand;
+import
org.apache.rocketmq.tools.command.broker.RemoveColdDataFlowCtrGroupConfigSubCommand;
import
org.apache.rocketmq.tools.command.broker.ResetMasterFlushOffsetSubCommand;
import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
+import
org.apache.rocketmq.tools.command.broker.UpdateColdDataFlowCtrGroupConfigSubCommand;
import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand;
import
org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
@@ -263,6 +267,11 @@ public class MQAdminStartup {
initCommand(new ReElectMasterSubCommand());
initCommand(new CleanControllerBrokerMetaSubCommand());
initCommand(new DumpCompactionLogCommand());
+
+ initCommand(new GetColdDataFlowCtrInfoSubCommand());
+ initCommand(new UpdateColdDataFlowCtrGroupConfigSubCommand());
+ initCommand(new RemoveColdDataFlowCtrGroupConfigSubCommand());
+ initCommand(new CommitLogSetReadAheadSubCommand());
}
private static void printHelp() {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
new file mode 100644
index 0000000000..b00c7f5f58
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CommitLogSetReadAheadSubCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class CommitLogSetReadAheadSubCommand implements SubCommand {
+ private static final String MADV_RANDOM = "1";
+ private static final String MADV_NORMAL = "0";
+ @Override
+ public String commandName() {
+ return "setCommitLogReadAheadMode";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "set read ahead mode for all commitlog files";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "set which broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "set which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "commitLogReadAheadMode", true, "set the
CommitLog read ahead mode; 0 is default, 1 random read");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook)
+ throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String mode = commandLine.getOptionValue('m').trim();
+ if (!mode.equals(MADV_RANDOM) && !mode.equals(MADV_NORMAL)) {
+ System.out.printf("set the read mode error; 0 is default, 1
random read\n");
+ return;
+ }
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+ setAndPrint(defaultMQAdminExt,
String.format("============%s============\n", brokerAddr), brokerAddr, mode);
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ defaultMQAdminExt.start();
+ Map<String, List<String>> masterAndSlaveMap =
CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+ for (String masterAddr : masterAndSlaveMap.keySet()) {
+ setAndPrint(defaultMQAdminExt,
String.format("============Master: %s============\n", masterAddr), masterAddr,
mode);
+ for (String slaveAddr : masterAndSlaveMap.get(masterAddr))
{
+ setAndPrint(defaultMQAdminExt,
String.format("============My Master: %s=====Slave: %s============\n",
masterAddr, slaveAddr), slaveAddr, mode);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ protected void setAndPrint(final MQAdminExt defaultMQAdminExt, final
String printPrefix, final String addr, final String mode)
+ throws InterruptedException, RemotingConnectException,
UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException,
RemotingSendRequestException {
+ System.out.print(" " + printPrefix);
+ System.out.printf("commitLog set readAhead mode rstStr" +
defaultMQAdminExt.setCommitLogReadAheadMode(addr, mode) + "\n");
+ }
+}
\ No newline at end of file
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
new file mode 100644
index 0000000000..7c54e650c3
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetColdDataFlowCtrInfoSubCommand.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.MQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class GetColdDataFlowCtrInfoSubCommand implements SubCommand {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ @Override
+ public String commandName() {
+ return "getColdDataFlowCtrInfo";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "get cold data flow ctr info";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "get from which
broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "get from which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(final CommandLine commandLine, final Options options,
final RPCHook rpcHook)
+ throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+ getAndPrint(defaultMQAdminExt,
String.format("============%s============\n", brokerAddr), brokerAddr);
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ defaultMQAdminExt.start();
+ Map<String, List<String>> masterAndSlaveMap =
CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+ for (String masterAddr : masterAndSlaveMap.keySet()) {
+ getAndPrint(defaultMQAdminExt,
String.format("============Master: %s============\n", masterAddr), masterAddr);
+ for (String slaveAddr : masterAndSlaveMap.get(masterAddr))
{
+ getAndPrint(defaultMQAdminExt,
String.format("============My Master: %s=====Slave: %s============\n",
masterAddr, slaveAddr), slaveAddr);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final
String printPrefix, final String addr)
+ throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, RemotingTimeoutException,
+ MQBrokerException, RemotingSendRequestException {
+
+ System.out.print(" " + printPrefix);
+ String rstStr = defaultMQAdminExt.getColdDataFlowCtrInfo(addr);
+ if (rstStr == null) {
+ System.out.printf("Broker[%s] has no cold ctr table !\n", addr);
+ return;
+ }
+ JSONObject jsonObject = JSON.parseObject(rstStr);
+ Map<String, JSONObject> runtimeTable = (Map<String,
JSONObject>)jsonObject.get("runtimeTable");
+ runtimeTable.entrySet().stream().forEach(i -> {
+ JSONObject value = i.getValue();
+ Date lastColdReadTimeMillsDate = new
Date(Long.parseLong(String.valueOf(value.get("lastColdReadTimeMills"))));
+ value.put("lastColdReadTimeFormat",
sdf.format(lastColdReadTimeMillsDate));
+ value.remove("lastColdReadTimeMills");
+
+ Date createTimeMillsDate = new
Date(Long.parseLong(String.valueOf(value.get("createTimeMills"))));
+ value.put("createTimeFormat", sdf.format(createTimeMillsDate));
+ value.remove("createTimeMills");
+ });
+
+ String formatStr = JSON.toJSONString(jsonObject, true);
+ System.out.printf(formatStr);
+ System.out.printf("%n");
+ }
+
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
new file mode 100644
index 0000000000..b0477924f2
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/RemoveColdDataFlowCtrGroupConfigSubCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class RemoveColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "removeColdDataFlowCtrGroupConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "remove consumer from cold ctr config";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "update which
broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "consumerGroup", true, "the consumer group will
remove from the config");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ String consumerGroup = commandLine.getOptionValue('g').trim();
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.removeColdDataFlowCtrGroupConfig(brokerAddr,
consumerGroup);
+ System.out.printf("remove broker cold read threshold success,
%s\n", brokerAddr);
+ return;
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ defaultMQAdminExt.start();
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String brokerAddr : masterSet) {
+ try {
+
defaultMQAdminExt.removeColdDataFlowCtrGroupConfig(brokerAddr, consumerGroup);
+ System.out.printf("remove broker cold read threshold
success, %s\n", brokerAddr);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return;
+ }
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(),
options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
new file mode 100644
index 0000000000..d06a24b579
--- /dev/null
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateColdDataFlowCtrGroupConfigSubCommand.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.broker;
+
+
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateColdDataFlowCtrGroupConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "updateColdDataFlowCtrGroupConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "addOrUpdate cold data flow ctr group config";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("b", "brokerAddr", true, "update which
broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update which cluster");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "consumerGroup", true, "specific consumerGroup");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("v", "threshold", true, "cold read threshold value");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ String key = commandLine.getOptionValue('g').trim();
+ String value = commandLine.getOptionValue('v').trim();
+ Properties properties = new Properties();
+ properties.put(key, value);
+
+ if (commandLine.hasOption('b')) {
+ String brokerAddr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.updateColdDataFlowCtrGroupConfig(brokerAddr,
properties);
+ System.out.printf("updateColdDataFlowCtrGroupConfig success,
%s\n", brokerAddr);
+ return;
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+ defaultMQAdminExt.start();
+ Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String brokerAddr : masterSet) {
+ try {
+
defaultMQAdminExt.updateColdDataFlowCtrGroupConfig(brokerAddr, properties);
+ System.out.printf("updateColdDataFlowCtrGroupConfig
success, %s\n", brokerAddr);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ return;
+ }
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(),
options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
\ No newline at end of file