[
https://issues.apache.org/jira/browse/HADOOP-16202?focusedWorklogId=755223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755223
]
ASF GitHub Bot logged work on HADOOP-16202:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Apr/22 13:34
Start Date: 11/Apr/22 13:34
Worklog Time Spent: 10m
Work Description: mehakmeet commented on code in PR #2584:
URL: https://github.com/apache/hadoop/pull/2584#discussion_r847102156
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+ the filename of the path, not the full path.
+ This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder`
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle,
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
Review Comment:
typo: "between"
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4799,23 +4802,26 @@ public AWSCredentialProviderList shareCredentials(final
String purpose) {
@Retries.RetryTranslated
@AuditEntryPoint
private FSDataInputStream select(final Path source,
- final String expression,
final Configuration options,
- final Optional<S3AFileStatus> providedStatus)
+ final OpenFileSupport.OpenFileInformation fileInformation)
Review Comment:
Javadoc correction: Remove params not needed from javadocs of this method.
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+ the filename of the path, not the full path.
+ This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder`
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle,
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for
details
+on how to use the builder, and for standard options which may be pssed in.
Review Comment:
typo: "passed"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
Review Comment:
nit: Correct the comment. Since we aren't falling back, but using one of the
listed policy.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
Review Comment:
nit: "Delete" is a mistype?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "hbase, random")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ /**
+ * Verify that if a list of policies is supplied in a configuration,
+ * the first recognized policy will be adopted.
+ */
+ @Test
+ public void testSeekPolicyExtractionFromList() throws Throwable {
+ String plist = "a, b, RandOm, other ";
+ Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+ Collection<String> options = conf.getTrimmedStringCollection(
+ FS_OPTION_OPENFILE_READ_POLICY);
+ Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options,
null))
+ .describedAs("Policy from " + plist)
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ @Test
+ public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+ .describedAs("adaptive")
+ .isEqualTo(S3AInputPolicy.Normal);
Review Comment:
Since at the moment normal, default, and adaptive all map to normal,
shouldn't we also verify if the boolean isAdaptive is also true in this case?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "hbase, random")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ /**
+ * Verify that if a list of policies is supplied in a configuration,
+ * the first recognized policy will be adopted.
+ */
+ @Test
+ public void testSeekPolicyExtractionFromList() throws Throwable {
+ String plist = "a, b, RandOm, other ";
Review Comment:
Would be better to include valid policies in this list to show the first
valid policy is picked up.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "hbase, random")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ /**
+ * Verify that if a list of policies is supplied in a configuration,
+ * the first recognized policy will be adopted.
+ */
+ @Test
+ public void testSeekPolicyExtractionFromList() throws Throwable {
+ String plist = "a, b, RandOm, other ";
+ Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+ Collection<String> options = conf.getTrimmedStringCollection(
+ FS_OPTION_OPENFILE_READ_POLICY);
+ Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options,
null))
+ .describedAs("Policy from " + plist)
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ @Test
+ public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+ .describedAs("adaptive")
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ @Test
+ public void testUnknownSeekPolicyFallback() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+ .describedAs("unkown policy")
+ .isNull();
+ }
+
+ /**
+ * Test the mapping of the standard option names.
+ */
+ @Test
+ public void testInputPolicyMapping() throws Throwable {
+ Object[][] policyMapping = {
+ {"normal", S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+ {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+ };
+ for (Object[] mapping : policyMapping) {
+ String name = (String) mapping[0];
+ Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+ .describedAs("Policy %s", name)
+ .isEqualTo(mapping[1]);
+ }
+ }
+
+ /**
+ * Verify readahead range is picked up.
+ */
+ @Test
+ public void testReadahead() throws Throwable {
+ // readahead range option
+ assertOpenFile(READAHEAD_RANGE, "4096")
+ .extracting(f -> f.getReadAheadRange())
+ .isEqualTo(4096L);
+ }
+
+ /** is
+ * Verify readahead range is picked up.
+ */
+ @Test
+ public void testBufferSize() throws Throwable {
+ // readahead range option
Review Comment:
nit: bufferSize.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "hbase, random")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ /**
+ * Verify that if a list of policies is supplied in a configuration,
+ * the first recognized policy will be adopted.
+ */
+ @Test
+ public void testSeekPolicyExtractionFromList() throws Throwable {
+ String plist = "a, b, RandOm, other ";
+ Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+ Collection<String> options = conf.getTrimmedStringCollection(
+ FS_OPTION_OPENFILE_READ_POLICY);
+ Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options,
null))
+ .describedAs("Policy from " + plist)
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ @Test
+ public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+ .describedAs("adaptive")
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ @Test
+ public void testUnknownSeekPolicyFallback() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+ .describedAs("unkown policy")
+ .isNull();
+ }
+
+ /**
+ * Test the mapping of the standard option names.
+ */
+ @Test
+ public void testInputPolicyMapping() throws Throwable {
+ Object[][] policyMapping = {
+ {"normal", S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
+ {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
+ {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
+ };
+ for (Object[] mapping : policyMapping) {
+ String name = (String) mapping[0];
+ Assertions.assertThat(S3AInputPolicy.getPolicy(name, null))
+ .describedAs("Policy %s", name)
+ .isEqualTo(mapping[1]);
+ }
+ }
+
+ /**
+ * Verify readahead range is picked up.
+ */
+ @Test
+ public void testReadahead() throws Throwable {
+ // readahead range option
+ assertOpenFile(READAHEAD_RANGE, "4096")
+ .extracting(f -> f.getReadAheadRange())
+ .isEqualTo(4096L);
+ }
+
+ /** is
+ * Verify readahead range is picked up.
+ */
+ @Test
Review Comment:
nit: Javadoc correction.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+ private Path testFile;
+
+ private FileStatus testFileStatus;
+
+ private long fileLength;
+
+ public ITestS3AOpenCost() {
+ super(true);
+ }
+
+ /**
+ * Setup creates a test file, saves is status and length
+ * to fields.
+ */
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ S3AFileSystem fs = getFileSystem();
+ testFile = methodPath();
+
+ writeTextFile(fs, testFile, "openfile", true);
+ testFileStatus = fs.getFileStatus(testFile);
+ fileLength = testFileStatus.getLen();
+ }
+
+ /**
+ * Test when openFile() performs GET requests when file status
+ * and length options are passed down.
+ * Note that the input streams only update the FS statistics
+ * in close(), so metrics cannot be verified until all operations
+ * on a stream are complete.
+ * This is slightly less than ideal.
+ */
+ @Test
+ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+ describe("Test cost of openFile with/without status; raw only");
+ S3AFileSystem fs = getFileSystem();
+
+ // now read that file back in using the openFile call.
+ // with a new FileStatus and a different path.
+ // this verifies that any FileStatus class/subclass is used
+ // as a source of the file length.
+ FileStatus st2 = new FileStatus(
+ fileLength, false,
+ testFileStatus.getReplication(),
+ testFileStatus.getBlockSize(),
+ testFileStatus.getModificationTime(),
+ testFileStatus.getAccessTime(),
+ testFileStatus.getPermission(),
+ testFileStatus.getOwner(),
+ testFileStatus.getGroup(),
+ new Path("gopher:///localhost/" + testFile.getName()));
+
+ // no IO in open
+ FSDataInputStream in = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .withFileStatus(st2)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // the stream gets opened during read
+ long readLen = verifyMetrics(() ->
+ readStream(in),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 1));
+ assertEquals("bytes read from file", fileLength, readLen);
+ }
+
+ @Test
+ public void testOpenFileShorterLength() throws Throwable {
+ // do a second read with the length declared as short.
+ // we now expect the bytes read to be shorter.
+ S3AFileSystem fs = getFileSystem();
+
+ S3ATestUtils.MetricDiff bytesDiscarded =
+ new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
+ int offset = 2;
+ long shortLen = fileLength - offset;
+ // open the file
+ FSDataInputStream in2 = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .must(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+ .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // verify that the statistics are in rane
Review Comment:
nit: "range"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java:
##########
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.ObjectAssert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for {@link OpenFileSupport} and the associated
+ * seek policy lookup in {@link S3AInputPolicy}.
+ */
+public class TestOpenFileSupport extends HadoopTestBase {
+
+ private static final ChangeDetectionPolicy CHANGE_POLICY =
+ ChangeDetectionPolicy.createPolicy(
+ ChangeDetectionPolicy.Mode.Server,
+ ChangeDetectionPolicy.Source.None,
+ false);
+
+ private static final long READ_AHEAD_RANGE = 16;
+
+ private static final String USERNAME = "hadoop";
+
+ public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
+
+ public static final String TESTFILE = "s3a://bucket/name";
+
+ private static final Path TESTPATH = new Path(TESTFILE);
+
+ /**
+ * Create a OpenFileSupport instance.
+ */
+ private static final OpenFileSupport PREPARE =
+ new OpenFileSupport(
+ CHANGE_POLICY,
+ READ_AHEAD_RANGE,
+ USERNAME,
+ IO_FILE_BUFFER_SIZE_DEFAULT,
+ DEFAULT_ASYNC_DRAIN_THRESHOLD,
+ INPUT_POLICY);
+
+ @Test
+ public void testSimpleFile() throws Throwable {
+ ObjectAssert<OpenFileSupport.OpenFileInformation>
+ asst = assertFileInfo(
+ PREPARE.openSimpleFile(1024));
+
+ asst.extracting(f -> f.getChangePolicy())
+ .isEqualTo(CHANGE_POLICY);
+ asst.extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ asst.extracting(f -> f.getReadAheadRange())
+ .isEqualTo(READ_AHEAD_RANGE);
+ }
+
+ /**
+ * Initiate an assert from an open file information instance.
+ * @param fi file info
+ * @return an assert stream.
+ */
+ private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo(
+ final OpenFileSupport.OpenFileInformation fi) {
+ return Assertions.assertThat(fi)
+ .describedAs("File Information %s", fi);
+ }
+
+ /**
+ * Create an assertion about the openFile information from a configuration
+ * with the given key/value option.
+ * @param key key to set.
+ * @param option option value.
+ * @return the constructed OpenFileInformation.
+ */
+ public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile(
+ final String key,
+ final String option) throws IOException {
+ return assertFileInfo(prepareToOpenFile(params(key, option)));
+ }
+
+ @Test
+ public void testUnknownMandatoryOption() throws Throwable {
+
+ String key = "unknown";
+ intercept(IllegalArgumentException.class, key, () ->
+ prepareToOpenFile(params(key, "undefined")));
+ }
+
+ @Test
+ public void testSeekRandomIOPolicy() throws Throwable {
+
+ // ask for random IO
+ String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+
+ // is picked up
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ // and as neither status nor length was set: no file status
+ assertOpenFile(INPUT_FADVISE, option)
+ .extracting(f -> f.getStatus())
+ .isNull();
+ }
+
+ /**
+ * There's a standard policy name. 'adaptive',
+ * meaning 'whatever this stream does to adapt to the client's use'.
+ * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}.
+ */
+ @Test
+ public void testSeekPolicyAdaptive() throws Throwable {
+
+ // when caller asks for adaptive, they get "normal"
+ assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ /**
+ * Verify that an unknown seek policy falls back to
+ * {@link S3AInputPolicy#Normal}. Delete
+ */
+ @Test
+ public void testUnknownSeekPolicyS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "undefined")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(INPUT_POLICY);
+ }
+
+ /**
+ * The S3A option also supports a list of values.
+ */
+ @Test
+ public void testSeekPolicyListS3AOption() throws Throwable {
+ // fall back to the normal seek policy.
+ assertOpenFile(INPUT_FADVISE, "hbase, random")
+ .extracting(f -> f.getInputPolicy())
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ /**
+ * Verify that if a list of policies is supplied in a configuration,
+ * the first recognized policy will be adopted.
+ */
+ @Test
+ public void testSeekPolicyExtractionFromList() throws Throwable {
+ String plist = "a, b, RandOm, other ";
+ Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist);
+ Collection<String> options = conf.getTrimmedStringCollection(
+ FS_OPTION_OPENFILE_READ_POLICY);
+ Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options,
null))
+ .describedAs("Policy from " + plist)
+ .isEqualTo(S3AInputPolicy.Random);
+ }
+
+ @Test
+ public void testAdaptiveSeekPolicyRecognized() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
+ .describedAs("adaptive")
+ .isEqualTo(S3AInputPolicy.Normal);
+ }
+
+ @Test
+ public void testUnknownSeekPolicyFallback() throws Throwable {
+ Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null))
+ .describedAs("unkown policy")
Review Comment:
typo: "unknown"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+ private Path testFile;
+
+ private FileStatus testFileStatus;
+
+ private long fileLength;
+
+ public ITestS3AOpenCost() {
+ super(true);
+ }
+
+ /**
+ * Setup creates a test file, saves is status and length
+ * to fields.
+ */
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ S3AFileSystem fs = getFileSystem();
+ testFile = methodPath();
+
+ writeTextFile(fs, testFile, "openfile", true);
+ testFileStatus = fs.getFileStatus(testFile);
+ fileLength = testFileStatus.getLen();
+ }
+
+ /**
+ * Test when openFile() performs GET requests when file status
+ * and length options are passed down.
+ * Note that the input streams only update the FS statistics
+ * in close(), so metrics cannot be verified until all operations
+ * on a stream are complete.
+ * This is slightly less than ideal.
+ */
+ @Test
+ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+ describe("Test cost of openFile with/without status; raw only");
+ S3AFileSystem fs = getFileSystem();
+
+ // now read that file back in using the openFile call.
+ // with a new FileStatus and a different path.
+ // this verifies that any FileStatus class/subclass is used
+ // as a source of the file length.
+ FileStatus st2 = new FileStatus(
+ fileLength, false,
+ testFileStatus.getReplication(),
+ testFileStatus.getBlockSize(),
+ testFileStatus.getModificationTime(),
+ testFileStatus.getAccessTime(),
+ testFileStatus.getPermission(),
+ testFileStatus.getOwner(),
+ testFileStatus.getGroup(),
+ new Path("gopher:///localhost/" + testFile.getName()));
+
+ // no IO in open
+ FSDataInputStream in = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .withFileStatus(st2)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // the stream gets opened during read
+ long readLen = verifyMetrics(() ->
+ readStream(in),
+ always(NO_IO),
Review Comment:
Little doubt here: When we read after opening the file, are we still on 0 IO?
Issue Time Tracking
-------------------
Worklog Id: (was: 755223)
Time Spent: 17h 20m (was: 17h 10m)
> Enhance openFile() for better read performance against object stores
> ---------------------------------------------------------------------
>
> Key: HADOOP-16202
> URL: https://issues.apache.org/jira/browse/HADOOP-16202
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs, fs/s3, tools/distcp
> Affects Versions: 3.3.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
> Time Spent: 17h 20m
> Remaining Estimate: 0h
>
> The {{openFile()}} builder API lets us add new options when reading a file
> Add an option {{"fs.s3a.open.option.length"}} which takes a long and allows
> the length of the file to be declared. If set, *no check for the existence of
> the file is issued when opening the file*
> Also: withFileStatus() to take any FileStatus implementation, rather than
> only S3AFileStatus -and not check that the path matches the path being
> opened. Needed to support viewFS-style wrapping and mounting.
> and Adopt where appropriate to stop clusters with S3A reads switched to
> random IO from killing download/localization
> * fs shell copyToLocal
> * distcp
> * IOUtils.copy
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]