wuzhanpeng commented on code in PR #3111: URL: https://github.com/apache/bookkeeper/pull/3111#discussion_r851147600
########## bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadAheadManager.java: ########## @@ -0,0 +1,655 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie.storage.ldb; + +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A management tool that supports asynchronous read-ahead operations for {@link SingleDirectoryDbLedgerStorage}. + **/ +public class ReadAheadManager { + + private static final Logger log = LoggerFactory.getLogger(ReadAheadManager.class); + + public static final String READ_AHEAD_MAX_MESSAGES = "dbStorage_readAheadMaxMessages"; + public static final String READ_AHEAD_MAX_BYTES = "dbStorage_readAheadMaxBytes"; + public static final String READ_AHEAD_PRE_TRIGGER_RATIO = "dbStorage_readAheadPreTriggerRatio"; + public static final String READ_AHEAD_TASK_EXPIRED_TIME_MS = "dbStorage_readAheadTaskExpiredTimeMs"; + public static final String READ_AHEAD_TIMEOUT_MS = "dbStorage_readAheadTimeoutMs"; + + // operation behavior indicator + public static final String SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = "dbStorage_submitReadAheadTaskImmediately"; + public static final String READ_AHEAD_TASK_POOL_SIZE = "dbStorage_readAheadTaskPoolSize"; + + private static final int DEFAULT_READ_AHEAD_MESSAGES = 1000; + private static final int DEFAULT_READ_AHEAD_BYTES = 256 * 1024; + private static final double DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO = 0.75; + + private static final long DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS = 60 * 1000; + private static final long DEFAULT_READ_AHEAD_TIMEOUT_MS = 30 * 1000; + + private static final boolean DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY = false; + private static final int DEFAULT_READ_AHEAD_TASK_POOL_SIZE = 8; + + private static final class LedgerEntryPosition { + + private long ledgerId; + private long entryId; + + public LedgerEntryPosition(long ledgerId, long entryId) { + this.ledgerId = ledgerId; + this.entryId = entryId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LedgerEntryPosition that = (LedgerEntryPosition) o; + return ledgerId == that.ledgerId + && entryId == that.entryId; + } + + @Override + public int hashCode() { + return Objects.hash(ledgerId, entryId); + } + } + + private static final class ReadAheadPos { + + private final long ledgerId; + private final long entryId; + private final long location; + private final long createTimeMs; + + private long readAheadTaskExpiredTimeMs; + + public ReadAheadPos(long ledgerId, long entryId, long location, long readAheadTaskExpiredTimeMs) { + + this.ledgerId = ledgerId; + this.entryId = entryId; + this.location = location; + + this.createTimeMs = System.currentTimeMillis(); + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + } + + public boolean isExpired() { + return (System.currentTimeMillis() - createTimeMs) > readAheadTaskExpiredTimeMs; + } + + public long getLedgerId() { + return ledgerId; + } + + public long getEntryId() { + return entryId; + } + + public long getLocation() { + return location; + } + } + + private final ExecutorService readAheadExecutor; + private final ScheduledExecutorService cleanupExecutor; + + private final ConcurrentHashMap<LedgerEntryPosition, ReadAheadPos> pendingReadAheadPositions; + private final ConcurrentLinkedQueue<LedgerEntryPosition> pendingDeletePositions; + + private final ConcurrentHashMap<Long, NavigableMap<Long, ReadAheadTaskStatus>> inProgressReadAheadTaskStatuses; + private final ConcurrentLinkedQueue<ReadAheadTaskStatus> pendingDeleteReadAheadTaskStatuses; + + private final EntryLogger entryLogger; + private final EntryLocationIndex entryLocationIndex; + private final ReadCache cache; + + private final DbLedgerStorageStats dbLedgerStorageStats; + + private final boolean submitReadAheadTaskImmediately; + private final int readAheadMessages; + private final int readAheadBytes; + private final double preTriggerReadAheadRatio; + + private final long readAheadTaskExpiredTimeMs; + private final long readAheadTimeoutMs; + + /** + * Entrance for test cases. + * + * @param entryLogger + * @param entryLocationIndex + * @param cache + * @param dbLedgerStorageStats + */ + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats) { + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_READ_AHEAD_TASK_POOL_SIZE, + DEFAULT_READ_AHEAD_MESSAGES, DEFAULT_READ_AHEAD_BYTES, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO, + DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS); + } + + /** + * Entrance for normal use. + * + * @param entryLogger + * @param entryLocationIndex + * @param cache + * @param dbLedgerStorageStats + * @param conf + */ + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, ServerConfiguration conf) { + this(entryLogger, entryLocationIndex, cache, dbLedgerStorageStats, + conf.getBoolean(SUBMIT_READ_AHEAD_TASK_IMMEDIATELY, DEFAULT_SUBMIT_READ_AHEAD_TASK_IMMEDIATELY), + conf.getInt(READ_AHEAD_TASK_POOL_SIZE, DEFAULT_READ_AHEAD_TASK_POOL_SIZE), + conf.getInt(READ_AHEAD_MAX_MESSAGES, DEFAULT_READ_AHEAD_MESSAGES), + conf.getInt(READ_AHEAD_MAX_BYTES, DEFAULT_READ_AHEAD_BYTES), + conf.getDouble(READ_AHEAD_PRE_TRIGGER_RATIO, DEFAULT_PRE_TRIGGER_READ_AHEAD_RATIO), + conf.getLong(READ_AHEAD_TASK_EXPIRED_TIME_MS, DEFAULT_READ_AHEAD_TASK_EXPIRED_TIME_MS), + conf.getLong(READ_AHEAD_TIMEOUT_MS, DEFAULT_READ_AHEAD_TIMEOUT_MS)); + } + + public ReadAheadManager(EntryLogger entryLogger, EntryLocationIndex entryLocationIndex, + ReadCache cache, DbLedgerStorageStats dbLedgerStorageStats, + boolean submitReadAheadTaskImmediately, int readAheadTaskPoolSize, + int readAheadMessages, int readAheadBytes, double preTriggerReadAheadRatio, + long readAheadTaskExpiredTimeMs, long readAheadTimeoutMs) { + // core components initialization + readAheadExecutor = Executors.newFixedThreadPool( + readAheadTaskPoolSize, new DefaultThreadFactory("read-ahead")); + cleanupExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("read-ahead-cleanup")); + + pendingReadAheadPositions = new ConcurrentHashMap<>(); + pendingDeletePositions = new ConcurrentLinkedQueue<>(); + + inProgressReadAheadTaskStatuses = new ConcurrentHashMap<>(); + pendingDeleteReadAheadTaskStatuses = new ConcurrentLinkedQueue<>(); + + // external assistant components assignment + this.entryLogger = entryLogger; + this.entryLocationIndex = entryLocationIndex; + this.cache = cache; + + // metrics + this.dbLedgerStorageStats = dbLedgerStorageStats; + + // configurable arguments + this.submitReadAheadTaskImmediately = submitReadAheadTaskImmediately; + this.readAheadMessages = readAheadMessages; + this.readAheadBytes = readAheadBytes; + this.preTriggerReadAheadRatio = preTriggerReadAheadRatio; + + this.readAheadTaskExpiredTimeMs = readAheadTaskExpiredTimeMs; + this.readAheadTimeoutMs = readAheadTimeoutMs; + + cleanupExecutor.scheduleAtFixedRate( + SafeRunnable.safeRun(this::removeExpiredReadAheadTasks), 30, 30, TimeUnit.SECONDS); + } + + public void shutdown() { + this.readAheadExecutor.shutdown(); + this.cleanupExecutor.shutdown(); + } + + private static void recordStatsInNano(OpStatsLogger logger, long startTimeNanos) { + logger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); + } + + private static void recordStatsInNano(Counter counter, long startTimeNanos) { + counter.add(MathUtils.elapsedNanos(startTimeNanos)); + } + + public void addNextReadPosition(long expectedLedgerId, long expectedEntryId, + long actualStartLedgerId, long actualStartEntryId, long location) { + LedgerEntryPosition lep = new LedgerEntryPosition(expectedLedgerId, expectedEntryId); + pendingReadAheadPositions.put(lep, new ReadAheadPos( + actualStartLedgerId, actualStartEntryId, location, readAheadTaskExpiredTimeMs)); + pendingDeletePositions.add(lep); + } + + protected ReadAheadTaskStatus getNearestTask(long ledgerId, long entryId) { + NavigableMap<Long, ReadAheadTaskStatus> ledgerReadAheadTaskStatuses = + inProgressReadAheadTaskStatuses.get(ledgerId); + if (ledgerReadAheadTaskStatuses != null) { + Map.Entry<Long, ReadAheadTaskStatus> floorEntry = ledgerReadAheadTaskStatuses.floorEntry(entryId); + if (floorEntry != null) { + return floorEntry.getValue(); + } + } + return null; + } + + /** + * Remove those read-ahead tasks which have already exceeded expired time. + * NOTE: this method is NOT thread-safe, thus it should be kept in a single thread. + */ + private void removeExpiredReadAheadTasks() { + // cleanup read-ahead pos + int reclaimedPositions = 0; + while (!pendingDeletePositions.isEmpty()) { + ReadAheadPos pos = pendingReadAheadPositions.computeIfPresent( + pendingDeletePositions.peek(), + (lep, rap) -> { + if (rap.isExpired()) { + return null; + } + return rap; + }); + if (pos == null) { + pendingDeletePositions.poll(); + reclaimedPositions++; + } else { + break; + } + } + + // cleanup read-ahead task + int reclaimedTasks = 0; + while (!pendingDeleteReadAheadTaskStatuses.isEmpty() + && pendingDeleteReadAheadTaskStatuses.peek().isExpired()) { + ReadAheadTaskStatus readAheadTaskStatus = pendingDeleteReadAheadTaskStatuses.poll(); + reclaimedTasks++; + inProgressReadAheadTaskStatuses.computeIfPresent( + readAheadTaskStatus.ledgerId, + (lid, ledgerReadAheadTaskStatuses) -> { + ledgerReadAheadTaskStatuses.remove(readAheadTaskStatus.startEntryId); + return ledgerReadAheadTaskStatuses.isEmpty() ? null : ledgerReadAheadTaskStatuses; + }); + } + + if (log.isDebugEnabled()) { + log.debug("Pending position map reclaimed {} positions, now is {}. " + + "Read-ahead task map reclaimed {} tasks, now is {}", + reclaimedPositions, pendingDeletePositions.size(), + reclaimedTasks, pendingDeleteReadAheadTaskStatuses.size()); + } + } + + /** + * This method could be invoked frequently. Please make it short and simple. + */ + public boolean hitInReadAheadPositions(long ledgerId, long entryId) { + AtomicBoolean isHit = new AtomicBoolean(false); + pendingReadAheadPositions.computeIfPresent( Review Comment: yes, and the entries that already exist in the cache will not be read once again. In this example, the part of (50, 100) will only actually be read once in the two read-ahead tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
