mukund-thakur commented on a change in pull request #2368:
URL: https://github.com/apache/hadoop/pull/2368#discussion_r501750162



##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -89,9 +91,24 @@ public AbfsInputStream(
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
+    this.alwaysReadBufferSize
+        = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+    if (this.bufferSize > readAheadBlockSize) {

Review comment:
       Can this LOG/validation be moved to AbfsInputStreamContext.build() ?

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -37,10 +39,10 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadBufferManager.class);
 
   private static final int NUM_BUFFERS = 16;
-  private static final int BLOCK_SIZE = 4 * 1024 * 1024;
   private static final int NUM_THREADS = 8;
   private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
 
+  private static int blockSize = 4 * 1024 * 1024;

Review comment:
       nit: use 4 * ONE_MB consistent as everywhere else.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -89,9 +91,24 @@ public AbfsInputStream(
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
+    this.alwaysReadBufferSize
+        = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
+    readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
+    if (this.bufferSize > readAheadBlockSize) {
+      LOG.debug(
+          "fs.azure.read.request.size[={}] is configured for higher size than "
+              + "fs.azure.read.readahead.blocksize[={}]. Auto-align "
+              + "readAhead block size to be same as readRequestSize.",
+          bufferSize, readAheadBlockSize);
+      readAheadBlockSize = this.bufferSize;
+    }
+
+    // Propagate the config values to ReadBufferManager so that the first 
instance
+    // to initialize it get can set the readAheadBlockSize

Review comment:
       nit: typo? initialize it get can set

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -178,11 +195,15 @@ private int readOneBlock(final byte[] b, final int off, 
final int len) throws IO
         buffer = new byte[bufferSize];
       }
 
-      // Enable readAhead when reading sequentially
-      if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || 
b.length >= bufferSize) {
+      if (alwaysReadBufferSize) {
         bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);

Review comment:
       JIRA and PR description says we are trying to read till bufferSize 
always rather than just the requested length but as per this line we are 
enabling the buffer manager readahead as well which is bypassed in random read 
in gen2 as per line 205 below. PS: I have never seen gen1 code though.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -49,21 +51,37 @@
   private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
   private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
   private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
-  private static final ReadBufferManager BUFFER_MANAGER; // singleton, 
initialized in static initialization block
+  private static ReadBufferManager bufferManager; // singleton, initialized in 
static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
 
-  static {
-    BUFFER_MANAGER = new ReadBufferManager();
-    BUFFER_MANAGER.init();
+  static ReadBufferManager getBufferManager() {

Review comment:
       Why all these changes ? Why not just initilize the blockSize in init() ? 

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
##########
@@ -74,6 +74,9 @@
   public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
 
   public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
+  public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;

Review comment:
       I think putting these config together with DEFAULT_READ_BUFFER_SIZE 
would make code more readable. Also use 4 * ONE_MB as used above.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
##########
@@ -464,4 +483,53 @@ int getCompletedReadListSize() {
   void callTryEvict() {
     tryEvict();
   }
+
+  @VisibleForTesting
+  void testResetReadBufferManager() {

Review comment:
       please add some reasoning/docs around these changes. Thanks.

##########
File path: 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
##########
@@ -223,16 +244,19 @@ private int readInternal(final long position, final 
byte[] b, final int offset,
 
       // queue read-aheads
       int numReadAheads = this.readAheadQueueDepth;
-      long nextSize;
       long nextOffset = position;
+      // First read to queue needs to be of readBufferSize and later

Review comment:
       Would like to understand the reasoning behind this. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to