wuzhanpeng commented on code in PR #3111:
URL: https://github.com/apache/bookkeeper/pull/3111#discussion_r851142486


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java:
##########
@@ -0,0 +1,655 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie.storage.ldb;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A management tool that supports asynchronous read-ahead operations for 
{@link SingleDirectoryDbLedgerStorage}.
+ **/
+public class ReadAheadManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReadAheadManager.class);
+
+    public static final String READ_AHEAD_MAX_MESSAGES = 
"dbStorage_readAheadMaxMessages";
+    public static final String READ_AHEAD_MAX_BYTES = 
"dbStorage_readAheadMaxBytes";
+    public static final String READ_AHEAD_PRE_TRIGGER_RATIO = 
"dbStorage_readAheadPreTriggerRatio";
+    public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = 
"dbStorage_readAheadTaskExpiredTimeMs";
+    public static final String READ_AHEAD_TIMEOUT_MS = 
"dbStorage_readAheadTimeoutMs";
+
+    // operation behavior indicator
+    public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = 
"dbStorage_submitReadAheadTaskImmediately";
+    public static final String READ_AHEAD_TASK_POOL_SIZE = 
"dbStorage_readAheadTaskPoolSize";
+
+    private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000;
+    private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024;
+    private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75;
+
+    private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 
1000;
+    private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000;
+
+    private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = 
false;
+    private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8;
+
+    private static final class LedgerEntryPosition {
+
+        private long ledgerId;
+        private long entryId;
+
+        public LedgerEntryPosition(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LedgerEntryPosition that = (LedgerEntryPosition) o;
+            return ledgerId == that.ledgerId
+                    && entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(ledgerId, entryId);
+        }
+    }
+
+    private static final class ReadAheadPos {
+
+        private final long ledgerId;
+        private final long entryId;
+        private final long location;
+        private final long createTimeMs;
+
+        private long readAheadTaskExpiredTimeMs;
+
+        public ReadAheadPos(long ledgerId, long entryId, long location, long 
readAheadTaskExpiredTimeMs) {
+
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.location = location;
+
+            this.createTimeMs = System.currentTimeMillis();
+            this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+        }
+
+        public boolean isExpired() {
+            return (System.currentTimeMillis() - createTimeMs) > 
readAheadTaskExpiredTimeMs;
+        }
+
+        public long getLedgerId() {
+            return ledgerId;
+        }
+
+        public long getEntryId() {
+            return entryId;
+        }
+
+        public long getLocation() {
+            return location;
+        }
+    }
+
+    private final ExecutorService readAheadExecutor;
+    private final ScheduledExecutorService cleanupExecutor;
+
+    private final ConcurrentHashMap<LedgerEntryPosition, ReadAheadPos> 
pendingReadAheadPositions;
+    private final ConcurrentLinkedQueue<LedgerEntryPosition> 
pendingDeletePositions;
+
+    private final ConcurrentHashMap<Long, NavigableMap<Long, 
ReadAheadTaskStatus>> inProgressReadAheadTaskStatuses;
+    private final ConcurrentLinkedQueue<ReadAheadTaskStatus> 
pendingDeleteReadAheadTaskStatuses;
+
+    private final EntryLogger entryLogger;
+    private final EntryLocationIndex entryLocationIndex;
+    private final ReadCache cache;
+
+    private final DbLedgerStorageStats dbLedgerStorageStats;
+
+    private final boolean submitReadAheadTaskImmediately;
+    private final int readAheadMessages;
+    private final int readAheadBytes;
+    private final double preTriggerReadAheadRatio;
+
+    private final long readAheadTaskExpiredTimeMs;
+    private final long readAheadTimeoutMs;
+
+    /**
+     * Entrance for test cases.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex 
entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats 
dbLedgerStorageStats) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats,
+                DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, 
DEFAULT_READ_AHEAD_TASK_POOL_SIZE,
+                DEFAULT_READ_AHEAD_MESSAGES, DEFAULT_READ_AHEAD_BYTES, 
DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO,
+                DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, 
DEFAULT_READ_AHEAD_TIMEOUT_MS);
+    }
+
+    /**
+     * Entrance for normal use.
+     *
+     * @param entryLogger
+     * @param entryLocationIndex
+     * @param cache
+     * @param dbLedgerStorageStats
+     * @param conf
+     */
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex 
entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats 
dbLedgerStorageStats, ServerConfiguration conf) {
+        this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats,
+                conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, 
DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY),
+                conf.getInt(READ_AHEAD_TASK_POOL_SIZE, 
DEFAULT_READ_AHEAD_TASK_POOL_SIZE),
+                conf.getInt(READ_AHEAD_MAX_MESSAGES, 
DEFAULT_READ_AHEAD_MESSAGES),
+                conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES),
+                conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, 
DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO),
+                conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, 
DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS),
+                conf.getLong(READ_AHEAD_TIMEOUT_MS, 
DEFAULT_READ_AHEAD_TIMEOUT_MS));
+    }
+
+    public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex 
entryLocationIndex,
+                            ReadCache cache, DbLedgerStorageStats 
dbLedgerStorageStats,
+                            boolean submitReadAheadTaskImmediately, int 
readAheadTaskPoolSize,
+                            int readAheadMessages, int readAheadBytes, double 
preTriggerReadAheadRatio,
+                            long readAheadTaskExpiredTimeMs, long 
readAheadTimeoutMs) {
+        // core components initialization
+        readAheadExecutor = Executors.newFixedThreadPool(
+                readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead"));
+        cleanupExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("read-ahead-cleanup"));
+
+        pendingReadAheadPositions = new ConcurrentHashMap<>();
+        pendingDeletePositions = new ConcurrentLinkedQueue<>();
+
+        inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>();
+        pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>();
+
+        // external assistant components assignment
+        this.entryLogger = entryLogger;
+        this.entryLocationIndex = entryLocationIndex;
+        this.cache = cache;
+
+        // metrics
+        this.dbLedgerStorageStats = dbLedgerStorageStats;
+
+        // configurable arguments
+        this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately;
+        this.readAheadMessages = readAheadMessages;
+        this.readAheadBytes = readAheadBytes;
+        this.preTriggerReadAheadRatio = preTriggerReadAheadRatio;
+
+        this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs;
+        this.readAheadTimeoutMs = readAheadTimeoutMs;
+
+        cleanupExecutor.scheduleAtFixedRate(
+                SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 
30, TimeUnit.SECONDS);
+    }
+
+    public void shutdown() {
+        this.readAheadExecutor.shutdown();
+        this.cleanupExecutor.shutdown();
+    }
+
+    private static void recordStatsInNano(OpStatsLogger logger, long 
startTimeNanos) {
+        logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), 
TimeUnit.NANOSECONDS);
+    }
+
+    private static void recordStatsInNano(Counter counter, long 
startTimeNanos) {
+        counter.add(MathUtils.elapsedNanos(startTimeNanos));
+    }
+
+    public void addNextReadPosition(long expectedLedgerId, long 
expectedEntryId,
+                                    long actualStartLedgerId, long 
actualStartEntryId, long location) {
+        LedgerEntryPosition lep = new LedgerEntryPosition(expectedLedgerId, 
expectedEntryId);
+        pendingReadAheadPositions.put(lep, new ReadAheadPos(

Review Comment:
   A separate cleanup thread will periodically removes expired read-ahead 
positions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to