anmolanmol1234 commented on code in PR #7801: URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215686620
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java: ########## @@ -0,0 +1,637 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + * V1 implementation of ReadBufferManager. + */ +final class ReadBufferManagerV1 implements ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger( + ReadBufferManagerV1.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; + + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private static int blockSize = 4 * ONE_MB; + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading + private static ReadBufferManagerV1 bufferManager; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); + + static ReadBufferManagerV1 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV1(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + blockSize = readAheadBlockSize; + } + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManagerV1() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + @Override + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + @Override + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method Review Comment: remove comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org