HScarb commented on code in PR #7065:
URL: https://github.com/apache/rocketmq/pull/7065#discussion_r1275705409


##########
store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java:
##########
@@ -0,0 +1,1296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.FixedSizeCache;
+import org.apache.rocketmq.store.RocksDBConsumeQueue;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.rocksdb.ConsumeQueueRocksDBStorage;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
+    private static final Logger ERROR_LOG = 
LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
+    private static final Logger ROCKSDB_LOG = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+    public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+    private static final byte[] MAX_BYTES = "max".getBytes(CHARSET_UTF8);
+    private static final byte[] MIN_BYTES = "min".getBytes(CHARSET_UTF8);
+    private static final int BATCH_SIZE = 16;
+    private static final byte CTRL_0 = '\u0000';
+    private static final byte CTRL_A = '\u0001';
+    private static final byte CTRL_2 = '\u0002';
+
+    /**
+     * Rocksdb ConsumeQueue's store unit. Format:
+     *
+     * <pre>
+     * 
┌─────────────────────────┬───────────┬───────────────────────┬───────────┬───────────┬───────────┬───────────────────────┐
+     * │ Topic Bytes Array Size  │  CTRL_A   │   Topic Bytes Array   │  CTRL_A 
  │  QueueId  │  CTRL_A   │  ConsumeQueue Offset  │
+     * │        (4 Bytes)        │ (1 Bytes) │       (n Bytes)       │ (1 
Bytes) │ (4 Bytes) │ (1 Bytes) │     (8 Bytes)         │
+     * 
├─────────────────────────┴───────────┴───────────────────────┴───────────┴───────────┴───────────┴───────────────────────┤
+     * │                                                    Key Unit           
                                                  │
+     * │                                                                       
                                                  │
+     * </pre>
+     *
+     * <pre>
+     * 
┌─────────────────────────────┬───────────────────┬──────────────────┬──────────────────┬───────────────────────┐
+     * │  CommitLog Physical Offset  │      Body Size    │   Tag HashCode   │  
Msg Store Time  │  ConsumeQueue Offset  │
+     * │        (8 Bytes)            │      (4 Bytes)    │    (8 Bytes)     │  
  (8 Bytes)     │      (8 Bytes)        │
+     * 
├─────────────────────────────┴───────────────────┴──────────────────┴──────────────────┴───────────────────────┤
+     * │                                                    Value Unit         
                                        │
+     * │                                                                       
                                        │
+     * </pre>
+     * ConsumeQueue's store unit. Size:
+     * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) + Msg 
Store Time(8) + ConsumeQueue Offset(8) =  36 Bytes
+     */
+    public static final int PHY_OFFSET_OFFSET = 0;
+    public static final int PHY_MSG_LEN_OFFSET = 8;
+    public static final int MSG_TAG_HASHCODE_OFFSET = 12;
+    public static final int MSG_STORE_TIME_SIZE_OFFSET = 20;
+    public static final int CQ_OFFSET_OFFSET = 28;
+    public static final int CQ_UNIT_SIZE = 36;
+
+    private static final int MAX_KEY_LEN = 300;
+    private static final int MAX_VALUE_LEN = CQ_UNIT_SIZE;
+
+    /**
+     * Rocksdb ConsumeQueue's Offset unit. Format:
+     *
+     * <pre>
+     * 
┌─────────────────────────┬───────────┬───────────────────────┬───────────┬───────────┬───────────┬─────────────┐
+     * │ Topic Bytes Array Size  │  CTRL_A   │   Topic Bytes Array   │  CTRL_A 
  │  Max(min) │  CTRL_A   │   QueueId   │
+     * │        (4 Bytes)        │ (1 Bytes) │       (n Bytes)       │ (1 
Bytes) │ (3 Bytes) │ (1 Bytes) │  (4 Bytes)  │
+     * 
├─────────────────────────┴───────────┴───────────────────────┴───────────┴───────────┴───────────┴─────────────┤
+     * │                                                    Key Unit           
                                        │
+     * │                                                                       
                                        │
+     * </pre>
+     *
+     * <pre>
+     * ┌─────────────────────────────┬───────────┬────────────────────────┐
+     * │  CommitLog Physical Offset  │  CTRL_A   │   ConsumeQueue Offset  │
+     * │        (8 Bytes)            │ (1 Bytes) │    (8 Bytes)           │
+     * ├─────────────────────────────┴───────────┴────────────────────────┤
+     * │                     Value Unit                                   │
+     * │                                                                  │
+     * </pre>
+     * ConsumeQueue's Offset unit. Size: CommitLog Physical Offset(8) + 
CTRL_A(1) + ConsumeQueue Offset(8) =  17 Bytes
+     */
+    private static final int OFFSET_PHY_OFFSET = 0;
+    private static final int OFFSET_CQ_OFFSET = 9;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final String storePath;
+    private final ConsumeQueueRocksDBStorage rocksDBStorage;
+    private final FlushOptions flushOptions;
+    private final WriteBatch writeBatch;
+    private final List<DispatchRequest> bufferDRList;
+    private final List<Pair<ByteBuffer, ByteBuffer>> cqBBPairList;
+    private final List<Pair<ByteBuffer, ByteBuffer>> offsetBBPairList;
+
+    private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> 
tempTopicQueueMaxOffsetMap;
+
+    private final Map<String/* topic-queueId */, PhyAndCQOffset> 
topicQueueMinCqOffset;
+    private final Map<String/* topic-queueId */, Long> topicQueueMaxCqOffset;
+    private final ByteBuffer maxPhyOffsetBB;
+
+    private static final ByteBuffer INNER_CHECKPOINT_TOPIC;
+    private static final int INNER_CHECKPOINT_TOPIC_LEN;
+    static {
+        byte[] topicBytes = "CHECKPOINT_TOPIC".getBytes(CHARSET_UTF8);
+        INNER_CHECKPOINT_TOPIC_LEN = 14 + topicBytes.length;
+        INNER_CHECKPOINT_TOPIC = 
ByteBuffer.allocateDirect(INNER_CHECKPOINT_TOPIC_LEN);
+        buildOffsetKeyBB0(INNER_CHECKPOINT_TOPIC, topicBytes, 0, true);
+    }
+    private volatile boolean isCQError = false;
+
+    private FixedSizeCache<TopicQueueOffset, ByteBuffer> consumeQueueCache;
+
+    public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
+        super(messageStore);
+
+        this.storePath = 
StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
+        this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, 
storePath, 4);
+        this.flushOptions = new FlushOptions();
+        this.flushOptions.setWaitForFlush(true);
+        this.flushOptions.setAllowWriteStall(false);
+
+        this.writeBatch = new WriteBatch();
+        this.bufferDRList = new ArrayList(BATCH_SIZE);

Review Comment:
   Raw use of parameterized class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to