[ 
https://issues.apache.org/jira/browse/HADOOP-16202?focusedWorklogId=755223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755223
 ]

ASF GitHub Bot logged work on HADOOP-16202:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/22 13:34
            Start Date: 11/Apr/22 13:34
    Worklog Time Spent: 10m 
      Work Description: mehakmeet commented on code in PR #2584:
URL: https://github.com/apache/hadoop/pull/2584#discussion_r847102156


##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+  the filename of the path, not the full path.
+  This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+###  <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path 
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` 
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, 
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same 
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and

Review Comment:
   typo: "between"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4799,23 +4802,26 @@ public AWSCredentialProviderList shareCredentials(final 
String purpose) {
   @Retries.RetryTranslated
   @AuditEntryPoint
   private FSDataInputStream select(final Path source,
-      final String expression,
       final Configuration options,
-      final Optional<S3AFileStatus> providedStatus)
+      final OpenFileSupport.OpenFileInformation fileInformation)

Review Comment:
   Javadoc correction: Remove params not needed from javadocs of this method.



##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+  the filename of the path, not the full path.
+  This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+###  <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path 
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder` 
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle, 
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same 
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for 
details
+on how to use the builder, and for standard options which may be pssed in.

Review Comment:
   typo: "passed"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.

Review Comment:
   nit: Correct the comment. Since we aren't falling back, but using one of the 
listed policy.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete

Review Comment:
   nit: "Delete" is a mistype?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, 
null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);

Review Comment:
   Since at the moment normal, default, and adaptive all map to normal, 
shouldn't we also verify if the boolean isAdaptive is also true in this case? 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";

Review Comment:
   Would be better to include valid policies in this list to show the first 
valid policy is picked up.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, 
null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")
+        .isNull();
+  }
+
+  /**
+   * Test the mapping of the standard option names.
+   */
+  @Test
+  public void testInputPolicyMapping() throws Throwable {
+    Object[][] policyMapping = {
+        {"normal", S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+    };
+    for (Object[] mapping : policyMapping) {
+      String name = (String) mapping[0];
+      Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+          .describedAs("Policy %s", name)
+          .isEqualTo(mapping[1]);
+    }
+  }
+
+  /**
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testReadahead() throws Throwable {
+    // readahead range option
+    assertOpenFile(READAHEAD_RANGE, "4096")
+        .extracting(f -> f.getReadAheadRange())
+        .isEqualTo(4096L);
+  }
+
+  /** is
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testBufferSize() throws Throwable {
+    // readahead range option

Review Comment:
   nit: bufferSize.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, 
null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")
+        .isNull();
+  }
+
+  /**
+   * Test the mapping of the standard option names.
+   */
+  @Test
+  public void testInputPolicyMapping() throws Throwable {
+    Object[][] policyMapping = {
+        {"normal", S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+        {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+        {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+    };
+    for (Object[] mapping : policyMapping) {
+      String name = (String) mapping[0];
+      Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+          .describedAs("Policy %s", name)
+          .isEqualTo(mapping[1]);
+    }
+  }
+
+  /**
+   * Verify readahead range is picked up.
+   */
+  @Test
+  public void testReadahead() throws Throwable {
+    // readahead range option
+    assertOpenFile(READAHEAD_RANGE, "4096")
+        .extracting(f -> f.getReadAheadRange())
+        .isEqualTo(4096L);
+  }
+
+  /** is
+   * Verify readahead range is picked up.
+   */
+  @Test

Review Comment:
   nit: Javadoc correction.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+  private Path testFile;
+
+  private FileStatus testFileStatus;
+
+  private long fileLength;
+
+  public ITestS3AOpenCost() {
+    super(true);
+  }
+
+  /**
+   * Setup creates a test file, saves is status and length
+   * to fields.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    testFile = methodPath();
+
+    writeTextFile(fs, testFile, "openfile", true);
+    testFileStatus = fs.getFileStatus(testFile);
+    fileLength = testFileStatus.getLen();
+  }
+
+  /**
+   * Test when openFile() performs GET requests when file status
+   * and length options are passed down.
+   * Note that the input streams only update the FS statistics
+   * in close(), so metrics cannot be verified until all operations
+   * on a stream are complete.
+   * This is slightly less than ideal.
+   */
+  @Test
+  public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+    describe("Test cost of openFile with/without status; raw only");
+    S3AFileSystem fs = getFileSystem();
+
+    // now read that file back in using the openFile call.
+    // with a new FileStatus and a different path.
+    // this verifies that any FileStatus class/subclass is used
+    // as a source of the file length.
+    FileStatus st2 = new FileStatus(
+        fileLength, false,
+        testFileStatus.getReplication(),
+        testFileStatus.getBlockSize(),
+        testFileStatus.getModificationTime(),
+        testFileStatus.getAccessTime(),
+        testFileStatus.getPermission(),
+        testFileStatus.getOwner(),
+        testFileStatus.getGroup(),
+        new Path("gopher:///localhost/"; + testFile.getName()));
+
+    // no IO in open
+    FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .withFileStatus(st2)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // the stream gets opened during read
+    long readLen = verifyMetrics(() ->
+            readStream(in),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 1));
+    assertEquals("bytes read from file", fileLength, readLen);
+  }
+
+  @Test
+  public void testOpenFileShorterLength() throws Throwable {
+    // do a second read with the length declared as short.
+    // we now expect the bytes read to be shorter.
+    S3AFileSystem fs = getFileSystem();
+
+    S3ATestUtils.MetricDiff bytesDiscarded =
+        new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
+    int offset = 2;
+    long shortLen = fileLength - offset;
+    // open the file
+    FSDataInputStream in2 = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .must(FS_OPTION_OPENFILE_READ_POLICY,
+                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // verify that the statistics are in rane

Review Comment:
   nit: "range"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+  private static final ChangeDetectionPolicy CHANGE_POLICY =
+      ChangeDetectionPolicy.createPolicy(
+          ChangeDetectionPolicy.Mode.Server,
+          ChangeDetectionPolicy.Source.None,
+          false);
+
+  private static final long READ_AHEAD_RANGE = 16;
+
+  private static final String USERNAME = "hadoop";
+
+  public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+  public static final String TESTFILE = "s3a://bucket/name";
+
+  private static final Path TESTPATH = new Path(TESTFILE);
+
+  /**
+   * Create a OpenFileSupport instance.
+   */
+  private static final OpenFileSupport PREPARE =
+      new OpenFileSupport(
+          CHANGE_POLICY,
+          READ_AHEAD_RANGE,
+          USERNAME,
+          IO_FILE_BUFFER_SIZE_DEFAULT,
+          DEFAULT_ASYNC_DRAIN_THRESHOLD,
+          INPUT_POLICY);
+
+  @Test
+  public void testSimpleFile() throws Throwable {
+    ObjectAssert<OpenFileSupport.OpenFileInformation>
+        asst = assertFileInfo(
+            PREPARE.openSimpleFile(1024));
+
+    asst.extracting(f -> f.getChangePolicy())
+        .isEqualTo(CHANGE_POLICY);
+    asst.extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+    asst.extracting(f -> f.getReadAheadRange())
+        .isEqualTo(READ_AHEAD_RANGE);
+  }
+
+  /**
+   * Initiate an assert from an open file information instance.
+   * @param fi file info
+   * @return an assert stream.
+   */
+  private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+      final OpenFileSupport.OpenFileInformation fi) {
+    return Assertions.assertThat(fi)
+        .describedAs("File Information %s", fi);
+  }
+
+  /**
+   * Create an assertion about the openFile information from a configuration
+   * with the given key/value option.
+   * @param key key to set.
+   * @param option option value.
+   * @return the constructed OpenFileInformation.
+   */
+  public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+      final String key,
+      final String option) throws IOException {
+    return assertFileInfo(prepareToOpenFile(params(key, option)));
+  }
+
+  @Test
+  public void testUnknownMandatoryOption() throws Throwable {
+
+    String key = "unknown";
+    intercept(IllegalArgumentException.class, key, () ->
+        prepareToOpenFile(params(key, "undefined")));
+  }
+
+  @Test
+  public void testSeekRandomIOPolicy() throws Throwable {
+
+    // ask for random IO
+    String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+    // is picked up
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+    // and as neither status nor length was set: no file status
+    assertOpenFile(INPUT_FADVISE, option)
+        .extracting(f -> f.getStatus())
+        .isNull();
+  }
+
+  /**
+   * There's a standard policy name. 'adaptive',
+   * meaning 'whatever this stream does to adapt to the client's use'.
+   * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+   */
+  @Test
+  public void testSeekPolicyAdaptive() throws Throwable {
+
+    // when caller asks for adaptive, they get "normal"
+    assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+        FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  /**
+   * Verify that an unknown seek policy falls back to
+   * {@link S3AInputPolicy#Normal}. Delete
+   */
+  @Test
+  public void testUnknownSeekPolicyS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "undefined")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(INPUT_POLICY);
+  }
+
+  /**
+   * The S3A option also supports a list of values.
+   */
+  @Test
+  public void testSeekPolicyListS3AOption() throws Throwable {
+    // fall back to the normal seek policy.
+    assertOpenFile(INPUT_FADVISE, "hbase, random")
+        .extracting(f -> f.getInputPolicy())
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  /**
+   * Verify that if a list of policies is supplied in a configuration,
+   * the first recognized policy will be adopted.
+   */
+  @Test
+  public void testSeekPolicyExtractionFromList() throws Throwable {
+    String plist = "a, b, RandOm, other ";
+    Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+    Collection<String> options = conf.getTrimmedStringCollection(
+        FS_OPTION_OPENFILE_READ_POLICY);
+    Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, 
null))
+        .describedAs("Policy from " + plist)
+        .isEqualTo(S3AInputPolicy.Random);
+  }
+
+  @Test
+  public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+        .describedAs("adaptive")
+        .isEqualTo(S3AInputPolicy.Normal);
+  }
+
+  @Test
+  public void testUnknownSeekPolicyFallback() throws Throwable {
+    Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+        .describedAs("unkown policy")

Review Comment:
   typo: "unknown"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+  private Path testFile;
+
+  private FileStatus testFileStatus;
+
+  private long fileLength;
+
+  public ITestS3AOpenCost() {
+    super(true);
+  }
+
+  /**
+   * Setup creates a test file, saves is status and length
+   * to fields.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    testFile = methodPath();
+
+    writeTextFile(fs, testFile, "openfile", true);
+    testFileStatus = fs.getFileStatus(testFile);
+    fileLength = testFileStatus.getLen();
+  }
+
+  /**
+   * Test when openFile() performs GET requests when file status
+   * and length options are passed down.
+   * Note that the input streams only update the FS statistics
+   * in close(), so metrics cannot be verified until all operations
+   * on a stream are complete.
+   * This is slightly less than ideal.
+   */
+  @Test
+  public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+    describe("Test cost of openFile with/without status; raw only");
+    S3AFileSystem fs = getFileSystem();
+
+    // now read that file back in using the openFile call.
+    // with a new FileStatus and a different path.
+    // this verifies that any FileStatus class/subclass is used
+    // as a source of the file length.
+    FileStatus st2 = new FileStatus(
+        fileLength, false,
+        testFileStatus.getReplication(),
+        testFileStatus.getBlockSize(),
+        testFileStatus.getModificationTime(),
+        testFileStatus.getAccessTime(),
+        testFileStatus.getPermission(),
+        testFileStatus.getOwner(),
+        testFileStatus.getGroup(),
+        new Path("gopher:///localhost/"; + testFile.getName()));
+
+    // no IO in open
+    FSDataInputStream in = verifyMetrics(() ->
+            fs.openFile(testFile)
+                .withFileStatus(st2)
+                .build()
+                .get(),
+        always(NO_IO),
+        with(STREAM_READ_OPENED, 0));
+
+    // the stream gets opened during read
+    long readLen = verifyMetrics(() ->
+            readStream(in),
+        always(NO_IO),

Review Comment:
   Little doubt here: When we read after opening the file, are we still on 0 IO?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755223)
    Time Spent: 17h 20m  (was: 17h 10m)

> Enhance openFile() for better read performance against object stores 
> ---------------------------------------------------------------------
>
>                 Key: HADOOP-16202
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16202
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs, fs/s3, tools/distcp
>    Affects Versions: 3.3.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 17h 20m
>  Remaining Estimate: 0h
>
> The {{openFile()}} builder API lets us add new options when reading a file
> Add an option {{"fs.s3a.open.option.length"}} which takes a long and allows 
> the length of the file to be declared. If set, *no check for the existence of 
> the file is issued when opening the file*
> Also: withFileStatus() to take any FileStatus implementation, rather than 
> only S3AFileStatus -and not check that the path matches the path being 
> opened. Needed to support viewFS-style wrapping and mounting.
> and Adopt where appropriate to stop clusters with S3A reads switched to 
> random IO from killing download/localization
> * fs shell copyToLocal
> * distcp
> * IOUtils.copy



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to