This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 654c12ed5 [ISSUE #6430] Scan topic.json to find compactionTopic and
copy it
654c12ed5 is described below
commit 654c12ed56224cabca06768d4f75be29baf9ea1f
Author: guyinyou <[email protected]>
AuthorDate: Mon Mar 27 15:25:29 2023 +0800
[ISSUE #6430] Scan topic.json to find compactionTopic and copy it
Co-authored-by: guyinyou <[email protected]>
---
.../apache/rocketmq/store/DefaultMessageStore.java | 5 +
.../org/apache/rocketmq/store/MessageStore.java | 140 +++++++++++----------
.../apache/rocketmq/store/kv/CompactionStore.java | 48 +++++--
.../store/plugin/AbstractPluginMessageStore.java | 8 ++
.../rocketmq/store/queue/ConsumeQueueStore.java | 6 +-
5 files changed, 135 insertions(+), 72 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 403cb9ad1..dcdae008c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2049,6 +2049,11 @@ public class DefaultMessageStore implements MessageStore
{
}
}
+ @Override
+ public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+ return this.consumeQueueStore.getTopicConfigs();
+ }
+
@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return this.consumeQueueStore.getTopicConfig(topic);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 8e86f8abe..f77739fc4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -20,14 +20,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
@@ -116,11 +119,11 @@ public interface MessageStore {
* Query at most <code>maxMsgNums</code> messages belonging to
<code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be
screened using provided message filter.
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
@@ -131,11 +134,11 @@ public interface MessageStore {
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, MessageFilter)
getMessage
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
@@ -146,13 +149,13 @@ public interface MessageStore {
* Query at most <code>maxMsgNums</code> messages belonging to
<code>topic</code> at <code>queueId</code> starting
* from given <code>offset</code>. Resulting messages will further be
screened using provided message filter.
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param maxTotalMsgSize Maximum total msg size of the messages
- * @param messageFilter Message filter used to screen desired messages.
+ * @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
GetMessageResult getMessage(final String group, final String topic, final
int queueId,
@@ -162,13 +165,13 @@ public interface MessageStore {
* Asynchronous get message
* @see #getMessage(String, String, int, long, int, int, MessageFilter)
getMessage
*
- * @param group Consumer group that launches this query.
- * @param topic Topic to query.
- * @param queueId Queue ID to query.
- * @param offset Logical offset to start from.
- * @param maxMsgNums Maximum count of messages to query.
+ * @param group Consumer group that launches this query.
+ * @param topic Topic to query.
+ * @param queueId Queue ID to query.
+ * @param offset Logical offset to start from.
+ * @param maxMsgNums Maximum count of messages to query.
* @param maxTotalMsgSize Maximum total msg size of the messages
- * @param messageFilter Message filter used to screen desired messages.
+ * @param messageFilter Message filter used to screen desired messages.
* @return Matched messages.
*/
CompletableFuture<GetMessageResult> getMessageAsync(final String group,
final String topic, final int queueId,
@@ -177,7 +180,7 @@ public interface MessageStore {
/**
* Get maximum offset of the topic queue.
*
- * @param topic Topic name.
+ * @param topic Topic name.
* @param queueId Queue ID.
* @return Maximum offset at present.
*/
@@ -186,8 +189,8 @@ public interface MessageStore {
/**
* Get maximum offset of the topic queue.
*
- * @param topic Topic name.
- * @param queueId Queue ID.
+ * @param topic Topic name.
+ * @param queueId Queue ID.
* @param committed return the max offset in ConsumeQueue if true, or the
max offset in CommitLog if false
* @return Maximum offset at present.
*/
@@ -196,7 +199,7 @@ public interface MessageStore {
/**
* Get the minimum offset of the topic queue.
*
- * @param topic Topic name.
+ * @param topic Topic name.
* @param queueId Queue ID.
* @return Minimum offset at present.
*/
@@ -209,8 +212,8 @@ public interface MessageStore {
/**
* Get the offset of the message in the commit log, which is also known as
physical offset.
*
- * @param topic Topic of the message to lookup.
- * @param queueId Queue ID.
+ * @param topic Topic of the message to lookup.
+ * @param queueId Queue ID.
* @param consumeQueueOffset offset of consume queue.
* @return physical offset.
*/
@@ -219,8 +222,8 @@ public interface MessageStore {
/**
* Look up the physical offset of the message whose store timestamp is as
specified.
*
- * @param topic Topic of the message.
- * @param queueId Queue ID.
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
* @param timestamp Timestamp to look up.
* @return physical offset which matches.
*/
@@ -238,7 +241,7 @@ public interface MessageStore {
* Look up the message by given commit log offset and size.
*
* @param commitLogOffset physical offset.
- * @param size message size
+ * @param size message size
* @return Message whose physical offset is as specified.
*/
MessageExt lookMessageByOffset(long commitLogOffset, int size);
@@ -255,7 +258,7 @@ public interface MessageStore {
* Get one message from the specified commit log offset.
*
* @param commitLogOffset commit log offset.
- * @param msgSize message size.
+ * @param msgSize message size.
* @return wrapped result of the message.
*/
SelectMappedBufferResult selectOneMessageByOffset(final long
commitLogOffset, final int msgSize);
@@ -266,7 +269,9 @@ public interface MessageStore {
* @return message store running info.
*/
String getRunningDataInfo();
+
long getTimingMessageCount(String topic);
+
/**
* Message store runtime information, which should generally contains
various statistical information.
*
@@ -297,7 +302,7 @@ public interface MessageStore {
/**
* Get the store time of the earliest message in the given queue.
*
- * @param topic Topic of the messages to query.
+ * @param topic Topic of the messages to query.
* @param queueId Queue ID to find.
* @return store time of the earliest message.
*/
@@ -321,8 +326,8 @@ public interface MessageStore {
/**
* Get the store time of the message specified.
*
- * @param topic message topic.
- * @param queueId queue ID.
+ * @param topic message topic.
+ * @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
@@ -332,8 +337,8 @@ public interface MessageStore {
* Asynchronous get the store time of the message specified.
* @see #getMessageStoreTimeStamp(String, int, long)
getMessageStoreTimeStamp
*
- * @param topic message topic.
- * @param queueId queue ID.
+ * @param topic message topic.
+ * @param queueId queue ID.
* @param consumeQueueOffset consume queue offset.
* @return store timestamp of the message.
*/
@@ -343,7 +348,7 @@ public interface MessageStore {
/**
* Get the total number of the messages in the specified queue.
*
- * @param topic Topic
+ * @param topic Topic
* @param queueId Queue ID.
* @return total number.
*/
@@ -361,7 +366,7 @@ public interface MessageStore {
* Get the raw commit log data starting from the given offset, across
multiple mapped files.
*
* @param offset starting offset.
- * @param size size of data to get
+ * @param size size of data to get
* @return commit log data.
*/
List<SelectMappedBufferResult> getBulkCommitLogData(final long offset,
final int size);
@@ -370,9 +375,9 @@ public interface MessageStore {
* Append data to commit log.
*
* @param startOffset starting offset.
- * @param data data to append.
- * @param dataStart the start index of data array
- * @param dataLength the length of data array
+ * @param data data to append.
+ * @param dataStart the start index of data array
+ * @param dataLength the length of data array
* @return true if success; false otherwise.
*/
boolean appendToCommitLog(final long startOffset, final byte[] data, int
dataStart, int dataLength);
@@ -385,11 +390,11 @@ public interface MessageStore {
/**
* Query messages by given key.
*
- * @param topic topic of the message.
- * @param key message key.
+ * @param topic topic of the message.
+ * @param key message key.
* @param maxNum maximum number of the messages possible.
- * @param begin begin timestamp.
- * @param end end timestamp.
+ * @param begin begin timestamp.
+ * @param end end timestamp.
*/
QueryMessageResult queryMessage(final String topic, final String key,
final int maxNum, final long begin,
final long end);
@@ -398,11 +403,11 @@ public interface MessageStore {
* Asynchronous query messages by given key.
* @see #queryMessage(String, String, int, long, long) queryMessage
*
- * @param topic topic of the message.
- * @param key message key.
+ * @param topic topic of the message.
+ * @param key message key.
* @param maxNum maximum number of the messages possible.
- * @param begin begin timestamp.
- * @param end end timestamp.
+ * @param begin begin timestamp.
+ * @param end end timestamp.
*/
CompletableFuture<QueryMessageResult> queryMessageAsync(final String
topic, final String key, final int maxNum,
final long begin, final long end);
@@ -460,11 +465,11 @@ public interface MessageStore {
/**
* Check if the given message has been swapped out of the memory.
*
- * @param topic topic.
- * @param queueId queue ID.
+ * @param topic topic.
+ * @param queueId queue ID.
* @param consumeOffset consume queue offset.
* @return true if the message is no longer in memory; false otherwise.
- * @deprecated As of RIP-57, replaced by {@link
#checkInMemByConsumeOffset(String, int, long, int)}, see <a
href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more
details
+ * @deprecated As of RIP-57, replaced by {@link
#checkInMemByConsumeOffset(String, int, long, int)}, see <a
href="https://github.com/apache/rocketmq/issues/5837">this issue</a> for more
details
*/
@Deprecated
boolean checkInDiskByConsumeOffset(final String topic, final int queueId,
long consumeOffset);
@@ -570,7 +575,7 @@ public interface MessageStore {
/**
* Get consume queue of the topic/queue. If consume queue not exist, will
return null
*
- * @param topic Topic.
+ * @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
@@ -578,7 +583,7 @@ public interface MessageStore {
/**
* Get consume queue of the topic/queue. If consume queue not exist, will
create one then return it.
- * @param topic Topic.
+ * @param topic Topic.
* @param queueId Queue ID.
* @return Consume queue.
*/
@@ -594,8 +599,8 @@ public interface MessageStore {
/**
* Will be triggered when a new message is appended to commit log.
*
- * @param msg the msg that is appended to commit log
- * @param result append message result
+ * @param msg the msg that is appended to commit log
+ * @param result append message result
* @param commitLogFile commit log file
*/
void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult
result, MappedFile commitLogFile);
@@ -604,10 +609,10 @@ public interface MessageStore {
* Will be triggered when a new dispatch request is sent to message store.
*
* @param dispatchRequest dispatch request
- * @param doDispatch do dispatch if true
- * @param commitLogFile commit log file
- * @param isRecover is from recover process
- * @param isFileEnd if the dispatch request represents 'file end'
+ * @param doDispatch do dispatch if true
+ * @param commitLogFile commit log file
+ * @param isRecover is from recover process
+ * @param isFileEnd if the dispatch request represents 'file end'
*/
void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean
doDispatch, MappedFile commitLogFile,
boolean isRecover, boolean isFileEnd);
@@ -726,11 +731,18 @@ public interface MessageStore {
* Assign an queue offset and increase it. If there is a race condition,
you need to lock/unlock this method
* yourself.
*
- * @param msg message
+ * @param msg message
* @param messageNum message num
*/
void assignOffset(MessageExtBrokerInner msg, short messageNum);
+ /**
+ * get all topic config
+ *
+ * @return all topic config info
+ */
+ Map<String, TopicConfig> getTopicConfigs();
+
/**
* get topic config
*
@@ -814,7 +826,7 @@ public interface MessageStore {
* Calculate the checksum of a certain range of data.
*
* @param from begin offset
- * @param to end offset
+ * @param to end offset
* @return checksum
*/
byte[] calcDeltaChecksum(long from, long to);
@@ -956,7 +968,7 @@ public interface MessageStore {
/**
* Init store metrics
*
- * @param meter opentelemetry meter
+ * @param meter opentelemetry meter
* @param attributesBuilderSupplier metrics attributes builder
*/
void initMetrics(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b4487753f..1142c8153 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,8 +16,16 @@
*/
package org.apache.rocketmq.store.kv;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.GetMessageResult;
@@ -29,7 +37,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -48,6 +55,7 @@ public class CompactionStore {
private final CompactionPositionMgr positionMgr;
private final ConcurrentHashMap<String, CompactionLog> compactionLogTable;
private final ScheduledExecutorService compactionSchedule;
+ private final int scanInterval = 30000;
private final int compactionInterval;
private final int compactionThreadNum;
private final int offsetMapSize;
@@ -96,10 +104,7 @@ public class CompactionStore {
int queueId =
Integer.parseInt(fileQueueId.getName());
if (Files.isDirectory(Paths.get(compactionCqPath,
topic, String.valueOf(queueId)))) {
- CompactionLog log = new
CompactionLog(defaultMessageStore, this, topic, queueId);
- log.load(exitOk);
- compactionLogTable.put(topic + "_" + queueId,
log);
-
compactionSchedule.scheduleWithFixedDelay(log::doCompaction,
compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
+ loadAndGetClog(topic, queueId);
} else {
log.error("{}:{} compactionLog mismatch with
compactionCq", topic, queueId);
}
@@ -114,13 +119,37 @@ public class CompactionStore {
}
}
log.info("compactionStore {}:{} load completed.", compactionLogPath,
compactionCqPath);
+
+ compactionSchedule.scheduleWithFixedDelay(this::scanAllTopicConfig,
scanInterval, scanInterval, TimeUnit.MILLISECONDS);
+ log.info("loop to scan all topicConfig with fixed delay {}ms",
scanInterval);
}
- public void putMessage(String topic, int queueId, SelectMappedBufferResult
smr) throws Exception {
+ private void scanAllTopicConfig() {
+ log.info("start to scan all topicConfig");
+ try {
+ Iterator<Map.Entry<String, TopicConfig>> iterator =
defaultMessageStore.getTopicConfigs().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, TopicConfig> it = iterator.next();
+ TopicConfig topicConfig = it.getValue();
+ CleanupPolicy policy =
CleanupPolicyUtils.getDeletePolicy(Optional.ofNullable(topicConfig));
+ //check topic flag
+ if (Objects.equals(policy, CleanupPolicy.COMPACTION)) {
+ for (int queueId = 0; queueId <
topicConfig.getWriteQueueNums(); queueId++) {
+ loadAndGetClog(it.getKey(), queueId);
+ }
+ }
+ }
+ } catch (Throwable ignore) {
+ // ignore
+ }
+ log.info("scan all topicConfig over");
+ }
+
+ private CompactionLog loadAndGetClog(String topic, int queueId) {
CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId,
(k, v) -> {
if (v == null) {
try {
- v = new CompactionLog(defaultMessageStore,this, topic,
queueId);
+ v = new CompactionLog(defaultMessageStore, this, topic,
queueId);
v.load(true);
compactionSchedule.scheduleWithFixedDelay(v::doCompaction,
compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
} catch (IOException e) {
@@ -130,6 +159,11 @@ public class CompactionStore {
}
return v;
});
+ return clog;
+ }
+
+ public void putMessage(String topic, int queueId, SelectMappedBufferResult
smr) throws Exception {
+ CompactionLog clog = loadAndGetClog(topic, queueId);
if (clog != null) {
clog.asyncPutMessage(smr);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 77908c5fa..3f43adc12 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -21,14 +21,17 @@ import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.View;
+
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.TopicConfig;
@@ -592,6 +595,11 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
next.assignOffset(msg, messageNum);
}
+ @Override
+ public Map<String, TopicConfig> getTopicConfigs() {
+ return next.getTopicConfigs();
+ }
+
@Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return next.getTopicConfig(topic);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8b77f4942..90f2e74aa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -95,7 +95,7 @@ public class ConsumeQueueStore {
* Apply the dispatched request and build the consume queue. This function
should be idempotent.
*
* @param consumeQueue consume queue
- * @param request dispatch request
+ * @param request dispatch request
*/
public void putMessagePositionInfoWrapper(ConsumeQueueInterface
consumeQueue, DispatchRequest request) {
consumeQueue.putMessagePositionInfoWrapper(request);
@@ -537,6 +537,10 @@ public class ConsumeQueueStore {
}
}
+ public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
+ return this.topicConfigTable;
+ }
+
public Optional<TopicConfig> getTopicConfig(String topic) {
if (this.topicConfigTable == null) {
return Optional.empty();