anujmodi2021 commented on code in PR #6699:
URL: https://github.com/apache/hadoop/pull/6699#discussion_r1565182630


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property, 
String account) {
    * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
    */
   public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = 
"fs.azure.buffered.pread.disable";
+
+  /**
+   * Disable the call of HEAD call for opening a inputStream. ReadPath API of 
server
+   * would give the contentLength and eTag which would be used in subsequent 
calls
+   * for if-match headers.
+   */
+  public static final String FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM = 
"fs.azure.head.call.optimization.input.stream";

Review Comment:
   I would prefer: `fs.azure.input.stream.lazy.open.optimization.enabled`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -309,5 +309,19 @@ public static String accountProperty(String property, 
String account) {
    * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
    */
   public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = 
"fs.azure.buffered.pread.disable";
+
+  /**
+   * Disable the call of HEAD call for opening a inputStream. ReadPath API of 
server
+   * would give the contentLength and eTag which would be used in subsequent 
calls
+   * for if-match headers.
+   */
+  public static final String FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM = 
"fs.azure.head.call.optimization.input.stream";
+
+  /**
+   * Enable prefetch on the first read to {@link 
org.apache.hadoop.fs.azurebfs.services.AbfsInputStream}.
+   * If disabled, first call would not trigger prefetch. Prefetch would be 
switched on
+   * after first read call.
+   */
+  public static final String FS_AZURE_PREFETCH_FIRST_READ_CALL = 
"fs.azure.prefetch.first.read.call";

Review Comment:
   I would prefer: `fs.azure.prefetch.on.first.read.enabled`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -367,6 +367,14 @@ public class AbfsConfiguration{
       FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = 
DEFAULT_ENABLE_PAGINATED_DELETE)
   private boolean isPaginatedDeleteEnabled;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM,
+      DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)
+  private boolean headOptimizationForInputStream;
+
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_PREFETCH_FIRST_READ_CALL,
+      DefaultValue = DEFAULT_PREFETCH_READAHEAD_ON_FIRST_READ)
+  private boolean prefetchReadaheadOnFirstRead;

Review Comment:
   prefetch and read ahead are synonyms.
   I would prefer: `isPrefetchOnFirstReadEnabled`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -848,32 +848,37 @@ public AbfsInputStream openFileForRead(Path path,
               encryptionContext.getBytes(StandardCharsets.UTF_8));
         }
       } else {
-        AbfsHttpOperation op = client.getPathStatus(relativePath, false,
-            tracingContext, null).getResult();
-        resourceType = op.getResponseHeader(
-            HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-        contentLength = Long.parseLong(
-            op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-        eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
-        /*
-         * For file created with ENCRYPTION_CONTEXT, client shall receive
-         * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
-         */
-        if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
-          final String fileEncryptionContext = op.getResponseHeader(
-              HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
-          if (fileEncryptionContext == null) {
-            LOG.debug("EncryptionContext missing in GetPathStatus response");
-            throw new PathIOException(path.toString(),
-                "EncryptionContext not present in GetPathStatus response 
headers");
+        if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
+            || !abfsConfiguration.getHeadOptimizationForInputStream()) {
+          final AbfsHttpOperation op = client.getPathStatus(relativePath, 
false,
+              tracingContext, null).getResult();
+          resourceType = op.getResponseHeader(
+              HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+          contentLength = Long.parseLong(
+              op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+          eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+
+          /*
+           * For file created with ENCRYPTION_CONTEXT, client shall receive
+           * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
+           */
+          if (client.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) 
{
+            final String fileEncryptionContext = op.getResponseHeader(
+                HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
+            if (fileEncryptionContext == null) {
+              LOG.debug("EncryptionContext missing in GetPathStatus response");
+              throw new PathIOException(path.toString(),
+                  "EncryptionContext not present in GetPathStatus response 
headers");
+            }
+            contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
+                client.getEncryptionContextProvider(), getRelativePath(path),
+                fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
           }
-          contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
-              client.getEncryptionContextProvider(), getRelativePath(path),
-              fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
         }
       }
 
-      if (parseIsDirectory(resourceType)) {
+      if (!abfsConfiguration.getHeadOptimizationForInputStream()

Review Comment:
   Could there be a case where this optimizations is enabled and user passes a 
valid verisioned filestatus with resourcetype directory??
   
   If yes, better to fail for that as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
   }
 
   private int optimisedRead(final byte[] b, final int off, final int len,
-      final long readFrom, final long actualLen) throws IOException {
+      final long readFrom, final long actualLen,
+      final boolean isReadWithoutContentLengthInformation) throws IOException {
     fCursor = readFrom;
     int totalBytesRead = 0;
     int lastBytesRead = 0;
     try {
       buffer = new byte[bufferSize];
+      boolean fileStatusInformationPresentBeforeRead = 
fileStatusInformationPresent.get();
       for (int i = 0;
-           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && 
(!fileStatusInformationPresent.get()
+               || fCursor < getContentLength()); i++) {

Review Comment:
   Can we add a comment here as to what will happen if content Length is not 
available, Condition fCurson<getContentLength will be false and still we want 
to enter loop??



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -285,13 +335,23 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
   }
 
-  private boolean shouldReadFully() {
+  private boolean shouldReadFully(int lengthToRead) {

Review Comment:
   Can we have a comment on the logic used here?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -192,6 +210,30 @@ public int read(long position, byte[] buffer, int offset, 
int length)
         throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
       }
     }
+
+    /*
+     * When the inputStream is started, if the application tries to parallelly 
read

Review Comment:
   Some typos in the comments



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java:
##########
@@ -367,6 +367,14 @@ public class AbfsConfiguration{
       FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = 
DEFAULT_ENABLE_PAGINATED_DELETE)
   private boolean isPaginatedDeleteEnabled;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_HEAD_CALL_OPTIMIZATION_INPUT_STREAM,
+      DefaultValue = DEFAULT_HEAD_OPTIMIZATION_INPUT_STREAM)
+  private boolean headOptimizationForInputStream;

Review Comment:
   For boolean variables, it sounds better to start variable name with is
   Like `is<feature>Enabled`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);

Review Comment:
   Same here for last parameter. The content length is known so why false??



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -165,5 +165,7 @@ public static ApiVersion getCurrentVersion() {
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
+  public static final Integer READ_PATH_REQUEST_NOT_SATISFIABLE = 416;

Review Comment:
   Should we name this as per format followed by other HTTP Status code 
constants:
   `HTTP_RANGE_NOT_SATISFIABLE`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -360,7 +420,10 @@ private int readFileCompletely(final byte[] b, final int 
off, final int len)
     // data need to be copied to user buffer from index bCursor, bCursor has
     // to be the current fCusor
     bCursor = (int) fCursor;
-    return optimisedRead(b, off, len, 0, contentLength);
+    if (!fileStatusInformationPresent.get()) {
+      return optimisedRead(b, off, len, 0, bufferSize, false);
+    }
+    return optimisedRead(b, off, len, 0, getContentLength(), false);

Review Comment:
   Shouldn't the last parameter be true here?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -285,13 +335,23 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
   }
 
-  private boolean shouldReadFully() {
+  private boolean shouldReadFully(int lengthToRead) {
+    if (!fileStatusInformationPresent.get()) {
+      return (lengthToRead + fCursor) <= this.bufferSize
+          && this.firstRead && this.context.readSmallFilesCompletely();
+    }
+
     return this.firstRead && this.context.readSmallFilesCompletely()
-        && this.contentLength <= this.bufferSize;
+        && getContentLength() <= this.bufferSize;
   }
 
-  private boolean shouldReadLastBlock() {
-    long footerStart = max(0, this.contentLength - FOOTER_SIZE);
+  private boolean shouldReadLastBlock(int lengthToRead) {
+    if (!fileStatusInformationPresent.get()) {

Review Comment:
   Comment on new logic here as well



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -582,6 +701,29 @@ int readRemote(long position, byte[] b, int offset, int 
length, TracingContext t
     return (int) bytesRead;
   }
 
+  private void initPathPropertiesFromReadPathResponseHeader(final 
AbfsHttpOperation op) throws IOException {
+    if (DIRECTORY.equals(
+        op.getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE))) {
+      throw new FileNotFoundException(
+          "read must be used with files and not directories. Path: " + path);
+    }
+    contentLength = parseFromRange(
+        op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE));
+    eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+    fileStatusInformationPresent.set(true);

Review Comment:
   Should we have a check here that contentLength and ETag are valid before 
setting it to true??
   
   Or is it okay to have it -1??
   I see some handling done for that already.
   Just a thought



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -376,32 +439,48 @@ private int readLastBlock(final byte[] b, final int off, 
final int len)
     // data need to be copied to user buffer from index bCursor,
     // AbfsInutStream buffer is going to contain data from last block start. In
     // that case bCursor will be set to fCursor - lastBlockStart
-    long lastBlockStart = max(0, contentLength - footerReadSize);
+    if (!fileStatusInformationPresent.get()) {
+      long lastBlockStart = max(0, (fCursor + len) - footerReadSize);
+      bCursor = (int) (fCursor - lastBlockStart);
+      return optimisedRead(b, off, len, lastBlockStart, min(fCursor + len, 
footerReadSize), true);
+    }
+    long lastBlockStart = max(0, getContentLength() - footerReadSize);
     bCursor = (int) (fCursor - lastBlockStart);
     // 0 if contentlength is < buffersize
-    long actualLenToRead = min(footerReadSize, contentLength);
-    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
+    long actualLenToRead = min(footerReadSize, getContentLength());
+    return optimisedRead(b, off, len, lastBlockStart, actualLenToRead, false);
   }
 
   private int optimisedRead(final byte[] b, final int off, final int len,
-      final long readFrom, final long actualLen) throws IOException {
+      final long readFrom, final long actualLen,
+      final boolean isReadWithoutContentLengthInformation) throws IOException {
     fCursor = readFrom;
     int totalBytesRead = 0;
     int lastBytesRead = 0;
     try {
       buffer = new byte[bufferSize];
+      boolean fileStatusInformationPresentBeforeRead = 
fileStatusInformationPresent.get();
       for (int i = 0;
-           i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
+           i < MAX_OPTIMIZED_READ_ATTEMPTS && 
(!fileStatusInformationPresent.get()
+               || fCursor < getContentLength()); i++) {
         lastBytesRead = readInternal(fCursor, buffer, limit,
             (int) actualLen - limit, true);
         if (lastBytesRead > 0) {
           totalBytesRead += lastBytesRead;
+          boolean shouldBreak = !fileStatusInformationPresentBeforeRead
+              && totalBytesRead == (int) actualLen;
           limit += lastBytesRead;
           fCursor += lastBytesRead;
           fCursorAfterLastRead = fCursor;
+          if (shouldBreak) {
+            break;
+          }
         }
       }
     } catch (IOException e) {
+      if (isNonRetriableOptimizedReadException(e)) {
+        throw e;

Review Comment:
   Why throwing exception here?
   Don't we want to fallback to readOneBlock() for these?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to