[ https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008780#comment-18008780 ]
ASF GitHub Bot commented on HADOOP-19613: ----------------------------------------- anujmodi2021 commented on code in PR #7801: URL: https://github.com/apache/hadoop/pull/7801#discussion_r2219958922 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java: ########## @@ -0,0 +1,637 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + * V1 implementation of ReadBufferManager. + */ +final class ReadBufferManagerV1 implements ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger( + ReadBufferManagerV1.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; + + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private static int blockSize = 4 * ONE_MB; + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading + private static ReadBufferManagerV1 bufferManager; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); + + static ReadBufferManagerV1 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV1(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + blockSize = readAheadBlockSize; + } + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManagerV1() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + @Override + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read Review Comment: Added in base class > ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the > current working code > ------------------------------------------------------------------------------------------------ > > Key: HADOOP-19613 > URL: https://issues.apache.org/jira/browse/HADOOP-19613 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0, 3.4.1 > Reporter: Anuj Modi > Assignee: Anuj Modi > Priority: Major > Labels: pull-request-available > > Read Buffer Manager used today was introduced way back and has been stable > for quite a while. > Read Buffer Manager to be introduced as part of > https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many > changes incrementally over time. While the development goes on and we are > able to fully stabilise the optimized version we need the current flow to be > functional and undisturbed. > This work item is to isolate that from new code by refactoring > ReadBufferManager class to have 2 different implementations with same public > interfaces: ReadBufferManagerV1 and ReadBufferManagerV2. > This will also introduce new configs that can be used to toggle between new > and old code. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org