anmolanmol1234 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215686620


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
+ */
+final class ReadBufferManagerV1 implements ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      ReadBufferManagerV1.class);
+  private static final int ONE_KB = 1024;
+  private static final int ONE_MB = ONE_KB * ONE_KB;
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int NUM_THREADS = 8;
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
+
+  private static int blockSize = 4 * ONE_MB;
+  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
+  private static ReadBufferManagerV1 bufferManager; // singleton, initialized 
in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+
+  static ReadBufferManagerV1 getBufferManager() {
+    if (bufferManager == null) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null) {
+          bufferManager = new ReadBufferManagerV1();
+          bufferManager.init();
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+    return bufferManager;
+  }
+
+  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+    if (bufferManager == null) {
+      LOGGER.debug(
+          "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
+          readAheadBlockSize);
+      blockSize = readAheadBlockSize;
+    }
+  }
+
+  private void init() {
+    buffers = new byte[NUM_BUFFERS][];
+    for (int i = 0; i < NUM_BUFFERS; i++) {
+      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
+      freeList.add(i);
+    }
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Thread t = new Thread(new ReadBufferWorker(i));
+      t.setDaemon(true);
+      threads[i] = t;
+      t.setName("ABFS-prefetch-" + i);
+      t.start();
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  // hide instance constructor
+  private ReadBufferManagerV1() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  /**
+   * {@link AbfsInputStream} calls this method to queue read-aheads.
+   *
+   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
+   * @param requestedOffset The offset in the file which shoukd be read
+   * @param requestedLength The length to read
+   */
+  @Override
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
+      TracingContext tracingContext) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+          stream.getPath(), requestedOffset, requestedLength);
+    }
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream, requestedOffset)) {
+        return; // already queued, do not queue again
+      }
+      if (freeList.isEmpty() && !tryEvict()) {
+        return; // no buffers available, cannot queue anything
+      }
+
+      buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      Integer bufferIndex = freeList.pop();  // will return a value, since we 
have checked size > 0 already
+
+      buffer.setBuffer(buffers[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      readAheadQueue.add(buffer);
+      notifyAll();
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
+            stream.getPath(), requestedOffset, buffer.getBufferindex());
+      }
+    }
+  }
+
+  /**
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do it's own
+   * read to get the data faster (copmared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream   the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
+   */
+  @Override
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("getBlock for file {}  position {}  thread {}",
+          stream.getPath(), position, Thread.currentThread().getName());
+    }
+
+    waitForProcess(stream, position);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+    }
+    if (bytesRead > 0) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done read from Cache for {} position {} length {}",
+            stream.getPath(), position, bytesRead);
+      }
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
+    return 0;
+  }
+
+  /**
+   * ReadBufferWorker thread calls this to get the next buffer that it should 
work on.
+   *
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
+   */
+  @Override
+  public ReadBuffer getNextBlockToRead() throws InterruptedException {
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      //buffer = readAheadQueue.take();  // blocking method

Review Comment:
   remove comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to