zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1194891440
##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long
forceCleanSwapIntervalMs) {
public FlushManager getFlushManager() {
return flushManager;
}
+
+ public class ColdDataCheckService extends ServiceThread {
+ private final SystemClock systemClock = new SystemClock();
+ private final ConcurrentHashMap<String, byte[]> pageCacheMap = new
ConcurrentHashMap<>();
+ private int pageSize = -1;
+ private int sampleSteps = 32;
+
+ public ColdDataCheckService() {
+ sampleSteps =
defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+ if (sampleSteps <= 0) {
+ sampleSteps = 32;
+ }
+ initPageSize();
+ scanFilesInPageCache();
+ }
+
+ @Override
+ public String getServiceName() {
+ return ColdDataCheckService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ pageCacheMap.clear();
+ this.waitForRunning(180 * 1000);
+ continue;
+ } else {
+
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+ }
+ long beginClockTimestamp = this.systemClock.now();
+ scanFilesInPageCache();
+ long costTime = this.systemClock.now() -
beginClockTimestamp;
+ log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has e: {}", e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ public boolean isDataInPageCache(final long offset) {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ return true;
+ }
+ if (pageSize <= 0 || sampleSteps <= 0) {
+ return true;
+ }
+ if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset,
getMaxOffset())) {
+ return true;
+ }
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ return false;
+ }
+
+ MappedFile mappedFile =
mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+ if (null == mappedFile) {
+ return true;
+ }
+ byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+ if (null == bytes) {
+ return true;
+ }
+
+ int pos = (int)(offset %
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+ int realIndex = pos / pageSize / sampleSteps;
+ return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+ }
+
+ private void scanFilesInPageCache() {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ return;
+ }
+
+ try {
+ log.info("pageCacheMap key size: {}", pageCacheMap.size());
+ clearExpireMappedFile();
+ mappedFileQueue.getMappedFiles().stream().forEach(mappedFile
-> {
+ byte[] pageCacheTable = checkFileInPageCache(mappedFile);
+ if (sampleSteps > 1) {
+ pageCacheTable = sampling(pageCacheTable, sampleSteps);
+ }
+ pageCacheMap.put(mappedFile.getFileName(), pageCacheTable);
+ });
+ } catch (Exception e) {
+ log.error("scanFilesInPageCache exception", e);
+ }
+ }
+
+ private void clearExpireMappedFile() {
+ Set<String> currentFileSet = mappedFileQueue.getMappedFiles()
+
.stream().map(MappedFile::getFileName).collect(Collectors.toSet());
+ pageCacheMap.entrySet().stream().forEach(entry -> {
Review Comment:
Thank you very much for your review,I'd be happy to take your advice.
##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long
forceCleanSwapIntervalMs) {
public FlushManager getFlushManager() {
return flushManager;
}
+
+ public class ColdDataCheckService extends ServiceThread {
+ private final SystemClock systemClock = new SystemClock();
+ private final ConcurrentHashMap<String, byte[]> pageCacheMap = new
ConcurrentHashMap<>();
+ private int pageSize = -1;
+ private int sampleSteps = 32;
+
+ public ColdDataCheckService() {
+ sampleSteps =
defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+ if (sampleSteps <= 0) {
+ sampleSteps = 32;
+ }
+ initPageSize();
+ scanFilesInPageCache();
+ }
+
+ @Override
+ public String getServiceName() {
+ return ColdDataCheckService.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ pageCacheMap.clear();
+ this.waitForRunning(180 * 1000);
+ continue;
+ } else {
+
this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+ }
+ long beginClockTimestamp = this.systemClock.now();
+ scanFilesInPageCache();
+ long costTime = this.systemClock.now() -
beginClockTimestamp;
+ log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has e: {}", e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ public boolean isDataInPageCache(final long offset) {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+ return true;
+ }
+ if (pageSize <= 0 || sampleSteps <= 0) {
+ return true;
+ }
+ if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset,
getMaxOffset())) {
+ return true;
+ }
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ return false;
+ }
+
+ MappedFile mappedFile =
mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+ if (null == mappedFile) {
+ return true;
+ }
+ byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+ if (null == bytes) {
+ return true;
+ }
+
+ int pos = (int)(offset %
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+ int realIndex = pos / pageSize / sampleSteps;
+ return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+ }
+
+ private void scanFilesInPageCache() {
+ if
(!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() ||
!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+ return;
+ }
+
+ try {
+ log.info("pageCacheMap key size: {}", pageCacheMap.size());
+ clearExpireMappedFile();
+ mappedFileQueue.getMappedFiles().stream().forEach(mappedFile
-> {
Review Comment:
Thank you very much for your review,I'd be happy to take your advice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]