fuyou001 commented on code in PR #8842:
URL: https://github.com/apache/rocketmq/pull/8842#discussion_r1810534584
##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java:
##########
@@ -111,69 +117,147 @@ public class RocksDBConsumeQueueOffsetTable {
/**
* Although we have already put max(min) consumeQueueOffset and
physicalOffset in rocksdb, we still hope to get them
* from heap to avoid accessing rocksdb.
+ *
* @see ConsumeQueue#getMaxPhysicOffset(), maxPhysicOffset -->
topicQueueMaxCqOffset
* @see ConsumeQueue#getMinLogicOffset(), minLogicOffset -->
topicQueueMinOffset
*/
- private final Map<String/* topic-queueId */, PhyAndCQOffset>
topicQueueMinOffset;
- private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset;
+ private final ConcurrentMap<String/* topic-queueId */, PhyAndCQOffset>
topicQueueMinOffset;
+ private final ConcurrentMap<String/* topic-queueId */, Long>
topicQueueMaxCqOffset;
public RocksDBConsumeQueueOffsetTable(RocksDBConsumeQueueTable
rocksDBConsumeQueueTable,
ConsumeQueueRocksDBStorage rocksDBStorage, DefaultMessageStore
messageStore) {
this.rocksDBConsumeQueueTable = rocksDBConsumeQueueTable;
this.rocksDBStorage = rocksDBStorage;
this.messageStore = messageStore;
- this.topicQueueMinOffset = new ConcurrentHashMap(1024);
- this.topicQueueMaxCqOffset = new ConcurrentHashMap(1024);
+ this.topicQueueMinOffset = new ConcurrentHashMap<>(1024);
+ this.topicQueueMaxCqOffset = new ConcurrentHashMap<>(1024);
this.maxPhyOffsetBB = ByteBuffer.allocateDirect(8);
}
public void load() {
this.offsetCFH = this.rocksDBStorage.getOffsetCFHandle();
+ loadMaxConsumeQueueOffsets();
}
- public void updateTempTopicQueueMaxOffset(final Pair<ByteBuffer,
ByteBuffer> offsetBBPair,
- final byte[] topicBytes, final DispatchRequest request,
- final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>>
tempTopicQueueMaxOffsetMap) {
- buildOffsetKeyAndValueByteBuffer(offsetBBPair, topicBytes, request);
- ByteBuffer topicQueueId = offsetBBPair.getObject1();
- ByteBuffer maxOffsetBB = offsetBBPair.getObject2();
- Pair<ByteBuffer, DispatchRequest> old =
tempTopicQueueMaxOffsetMap.get(topicQueueId);
- if (old == null) {
- tempTopicQueueMaxOffsetMap.put(topicQueueId, new Pair(maxOffsetBB,
request));
- } else {
- long oldMaxOffset = old.getObject1().getLong(OFFSET_CQ_OFFSET);
- long maxOffset = maxOffsetBB.getLong(OFFSET_CQ_OFFSET);
- if (maxOffset >= oldMaxOffset) {
- ERROR_LOG.error("cqOffset invalid1. old: {}, now: {}",
oldMaxOffset, maxOffset);
+ private void loadMaxConsumeQueueOffsets() {
+ Function<OffsetEntry, Boolean> predicate = entry -> entry.type ==
OffsetEntryType.MAXIMUM;
+ Consumer<OffsetEntry> fn = entry -> {
+ topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" +
entry.queueId, entry.offset);
+ ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic,
entry.queueId, entry.offset, entry.commitLogOffset);
+ };
+ try {
+ forEach(predicate, fn);
+ } catch (RocksDBException e) {
+ log.error("Failed to maximum consume queue offset", e);
+ }
+ }
+
+ public void forEach(Function<OffsetEntry, Boolean> predicate,
Consumer<OffsetEntry> fn) throws RocksDBException {
+ try (RocksIterator iterator = this.rocksDBStorage.seekOffsetCF()) {
+ if (null == iterator) {
+ return;
+ }
+
+ int keyBufferCapacity = 256;
+ iterator.seekToFirst();
+ ByteBuffer keyBuffer =
ByteBuffer.allocateDirect(keyBufferCapacity);
+ ByteBuffer valueBuffer = ByteBuffer.allocateDirect(16);
+ while (iterator.isValid()) {
+ // parse key buffer according to key layout
+ keyBuffer.clear(); // clear position and limit before reuse
+ int total = iterator.key(keyBuffer);
+ if (total > keyBufferCapacity) {
Review Comment:
I get it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]