[ https://issues.apache.org/jira/browse/HADOOP-16202?focusedWorklogId=755223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755223 ]
ASF GitHub Bot logged work on HADOOP-16202: ------------------------------------------- Author: ASF GitHub Bot Created on: 11/Apr/22 13:34 Start Date: 11/Apr/22 13:34 Worklog Time Spent: 10m Work Description: mehakmeet commented on code in PR #2584: URL: https://github.com/apache/hadoop/pull/2584#discussion_r847102156 ########## hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md: ########## @@ -0,0 +1,122 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# `FileSystem.openFile()`/`FileContext.openFile()` + +This is a method provided by both FileSystem and FileContext for +advanced file opening options and, where implemented, +an asynchrounous/lazy opening of a file. + +Creates a builder to open a file, supporting options +both standard and filesystem specific. The return +value of the `build()` call is a `Future<FSDataInputStream>`, +which must be waited on. The file opening may be +asynchronous, and it may actually be postponed (including +permission/existence checks) until reads are actually +performed. + +This API call was added to `FileSystem` and `FileContext` in +Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows. + +* Added `opt(key, long)` and `must(key, long)`. +* Declared that `withFileStatus(null)` is allowed. +* Declared that `withFileStatus(status)` only checks + the filename of the path, not the full path. + This is needed to support passthrough/mounted filesystems. +* Added standard option keys. + +### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path path)` + +Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) +to construct a operation to open the file at `path` for reading. + +When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance, +the builder parameters are verified and +`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or +`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked. + +These protected methods returns a `CompletableFuture<FSDataInputStream>` +which, when its `get()` method is called, either returns an input +stream of the contents of opened file, or raises an exception. + +The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)` +ultimately invokes `FileSystem.open(Path, int)`. + +Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions +and postconditions as `FileSystem.open(Path p, int bufferSize)` + +However, there is one difference which implementations are free to +take advantage of: + +The returned stream MAY implement a lazy open where file non-existence or +access permission failures may not surface until the first `read()` of the +actual data. + +This saves network IO on object stores. + +The `openFile()` operation MAY check the state of the filesystem during its +invocation, but as the state of the filesystem may change betwen this call and Review Comment: typo: "between" ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -4799,23 +4802,26 @@ public AWSCredentialProviderList shareCredentials(final String purpose) { @Retries.RetryTranslated @AuditEntryPoint private FSDataInputStream select(final Path source, - final String expression, final Configuration options, - final Optional<S3AFileStatus> providedStatus) + final OpenFileSupport.OpenFileInformation fileInformation) Review Comment: Javadoc correction: Remove params not needed from javadocs of this method. ########## hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md: ########## @@ -0,0 +1,122 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# `FileSystem.openFile()`/`FileContext.openFile()` + +This is a method provided by both FileSystem and FileContext for +advanced file opening options and, where implemented, +an asynchrounous/lazy opening of a file. + +Creates a builder to open a file, supporting options +both standard and filesystem specific. The return +value of the `build()` call is a `Future<FSDataInputStream>`, +which must be waited on. The file opening may be +asynchronous, and it may actually be postponed (including +permission/existence checks) until reads are actually +performed. + +This API call was added to `FileSystem` and `FileContext` in +Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows. + +* Added `opt(key, long)` and `must(key, long)`. +* Declared that `withFileStatus(null)` is allowed. +* Declared that `withFileStatus(status)` only checks + the filename of the path, not the full path. + This is needed to support passthrough/mounted filesystems. +* Added standard option keys. + +### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path path)` + +Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) +to construct a operation to open the file at `path` for reading. + +When `build()` is invoked on the returned `FutureDataInputStreamBuilder` instance, +the builder parameters are verified and +`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or +`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked. + +These protected methods returns a `CompletableFuture<FSDataInputStream>` +which, when its `get()` method is called, either returns an input +stream of the contents of opened file, or raises an exception. + +The base implementation of the `FileSystem.openFileWithOptions(PathHandle, OpenFileParameters)` +ultimately invokes `FileSystem.open(Path, int)`. + +Thus the chain `FileSystem.openFile(path).build().get()` has the same preconditions +and postconditions as `FileSystem.open(Path p, int bufferSize)` + +However, there is one difference which implementations are free to +take advantage of: + +The returned stream MAY implement a lazy open where file non-existence or +access permission failures may not surface until the first `read()` of the +actual data. + +This saves network IO on object stores. + +The `openFile()` operation MAY check the state of the filesystem during its +invocation, but as the state of the filesystem may change betwen this call and +the actual `build()` and `get()` operations, this file-specific +preconditions (file exists, file is readable, etc) MUST NOT be checked here. + +FileSystem implementations which do not implement `open(Path, int)` +MAY postpone raising an `UnsupportedOperationException` until either the +`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call, +else they MAY fail fast in the `openFile()` call. + +Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for details +on how to use the builder, and for standard options which may be pssed in. Review Comment: typo: "passed" ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. Review Comment: nit: Correct the comment. Since we aren't falling back, but using one of the listed policy. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete Review Comment: nit: "Delete" is a mistype? ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "hbase, random") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + } + + /** + * Verify that if a list of policies is supplied in a configuration, + * the first recognized policy will be adopted. + */ + @Test + public void testSeekPolicyExtractionFromList() throws Throwable { + String plist = "a, b, RandOm, other "; + Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist); + Collection<String> options = conf.getTrimmedStringCollection( + FS_OPTION_OPENFILE_READ_POLICY); + Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null)) + .describedAs("Policy from " + plist) + .isEqualTo(S3AInputPolicy.Random); + } + + @Test + public void testAdaptiveSeekPolicyRecognized() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null)) + .describedAs("adaptive") + .isEqualTo(S3AInputPolicy.Normal); Review Comment: Since at the moment normal, default, and adaptive all map to normal, shouldn't we also verify if the boolean isAdaptive is also true in this case? ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "hbase, random") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + } + + /** + * Verify that if a list of policies is supplied in a configuration, + * the first recognized policy will be adopted. + */ + @Test + public void testSeekPolicyExtractionFromList() throws Throwable { + String plist = "a, b, RandOm, other "; Review Comment: Would be better to include valid policies in this list to show the first valid policy is picked up. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "hbase, random") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + } + + /** + * Verify that if a list of policies is supplied in a configuration, + * the first recognized policy will be adopted. + */ + @Test + public void testSeekPolicyExtractionFromList() throws Throwable { + String plist = "a, b, RandOm, other "; + Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist); + Collection<String> options = conf.getTrimmedStringCollection( + FS_OPTION_OPENFILE_READ_POLICY); + Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null)) + .describedAs("Policy from " + plist) + .isEqualTo(S3AInputPolicy.Random); + } + + @Test + public void testAdaptiveSeekPolicyRecognized() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null)) + .describedAs("adaptive") + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testUnknownSeekPolicyFallback() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null)) + .describedAs("unkown policy") + .isNull(); + } + + /** + * Test the mapping of the standard option names. + */ + @Test + public void testInputPolicyMapping() throws Throwable { + Object[][] policyMapping = { + {"normal", S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random}, + {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential}, + }; + for (Object[] mapping : policyMapping) { + String name = (String) mapping[0]; + Assertions.assertThat(S3AInputPolicy.getPolicy(name, null)) + .describedAs("Policy %s", name) + .isEqualTo(mapping[1]); + } + } + + /** + * Verify readahead range is picked up. + */ + @Test + public void testReadahead() throws Throwable { + // readahead range option + assertOpenFile(READAHEAD_RANGE, "4096") + .extracting(f -> f.getReadAheadRange()) + .isEqualTo(4096L); + } + + /** is + * Verify readahead range is picked up. + */ + @Test + public void testBufferSize() throws Throwable { + // readahead range option Review Comment: nit: bufferSize. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "hbase, random") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + } + + /** + * Verify that if a list of policies is supplied in a configuration, + * the first recognized policy will be adopted. + */ + @Test + public void testSeekPolicyExtractionFromList() throws Throwable { + String plist = "a, b, RandOm, other "; + Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist); + Collection<String> options = conf.getTrimmedStringCollection( + FS_OPTION_OPENFILE_READ_POLICY); + Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null)) + .describedAs("Policy from " + plist) + .isEqualTo(S3AInputPolicy.Random); + } + + @Test + public void testAdaptiveSeekPolicyRecognized() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null)) + .describedAs("adaptive") + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testUnknownSeekPolicyFallback() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null)) + .describedAs("unkown policy") + .isNull(); + } + + /** + * Test the mapping of the standard option names. + */ + @Test + public void testInputPolicyMapping() throws Throwable { + Object[][] policyMapping = { + {"normal", S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal}, + {FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random}, + {FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential}, + }; + for (Object[] mapping : policyMapping) { + String name = (String) mapping[0]; + Assertions.assertThat(S3AInputPolicy.getPolicy(name, null)) + .describedAs("Policy %s", name) + .isEqualTo(mapping[1]); + } + } + + /** + * Verify readahead range is picked up. + */ + @Test + public void testReadahead() throws Throwable { + // readahead range option + assertOpenFile(READAHEAD_RANGE, "4096") + .extracting(f -> f.getReadAheadRange()) + .isEqualTo(4096L); + } + + /** is + * Verify readahead range is picked up. + */ + @Test Review Comment: nit: Javadoc correction. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.performance; + + +import java.io.EOFException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; +import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Cost of openFile(). + */ +public class ITestS3AOpenCost extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AOpenCost.class); + + private Path testFile; + + private FileStatus testFileStatus; + + private long fileLength; + + public ITestS3AOpenCost() { + super(true); + } + + /** + * Setup creates a test file, saves is status and length + * to fields. + */ + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem fs = getFileSystem(); + testFile = methodPath(); + + writeTextFile(fs, testFile, "openfile", true); + testFileStatus = fs.getFileStatus(testFile); + fileLength = testFileStatus.getLen(); + } + + /** + * Test when openFile() performs GET requests when file status + * and length options are passed down. + * Note that the input streams only update the FS statistics + * in close(), so metrics cannot be verified until all operations + * on a stream are complete. + * This is slightly less than ideal. + */ + @Test + public void testOpenFileWithStatusOfOtherFS() throws Throwable { + describe("Test cost of openFile with/without status; raw only"); + S3AFileSystem fs = getFileSystem(); + + // now read that file back in using the openFile call. + // with a new FileStatus and a different path. + // this verifies that any FileStatus class/subclass is used + // as a source of the file length. + FileStatus st2 = new FileStatus( + fileLength, false, + testFileStatus.getReplication(), + testFileStatus.getBlockSize(), + testFileStatus.getModificationTime(), + testFileStatus.getAccessTime(), + testFileStatus.getPermission(), + testFileStatus.getOwner(), + testFileStatus.getGroup(), + new Path("gopher:///localhost/" + testFile.getName())); + + // no IO in open + FSDataInputStream in = verifyMetrics(() -> + fs.openFile(testFile) + .withFileStatus(st2) + .build() + .get(), + always(NO_IO), + with(STREAM_READ_OPENED, 0)); + + // the stream gets opened during read + long readLen = verifyMetrics(() -> + readStream(in), + always(NO_IO), + with(STREAM_READ_OPENED, 1)); + assertEquals("bytes read from file", fileLength, readLen); + } + + @Test + public void testOpenFileShorterLength() throws Throwable { + // do a second read with the length declared as short. + // we now expect the bytes read to be shorter. + S3AFileSystem fs = getFileSystem(); + + S3ATestUtils.MetricDiff bytesDiscarded = + new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE); + int offset = 2; + long shortLen = fileLength - offset; + // open the file + FSDataInputStream in2 = verifyMetrics(() -> + fs.openFile(testFile) + .must(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL) + .opt(FS_OPTION_OPENFILE_LENGTH, shortLen) + .build() + .get(), + always(NO_IO), + with(STREAM_READ_OPENED, 0)); + + // verify that the statistics are in rane Review Comment: nit: "range" ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java: ########## @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.ObjectAssert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.test.HadoopTestBase; + +import static java.util.Collections.singleton; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; +import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Unit tests for {@link OpenFileSupport} and the associated + * seek policy lookup in {@link S3AInputPolicy}. + */ +public class TestOpenFileSupport extends HadoopTestBase { + + private static final ChangeDetectionPolicy CHANGE_POLICY = + ChangeDetectionPolicy.createPolicy( + ChangeDetectionPolicy.Mode.Server, + ChangeDetectionPolicy.Source.None, + false); + + private static final long READ_AHEAD_RANGE = 16; + + private static final String USERNAME = "hadoop"; + + public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential; + + public static final String TESTFILE = "s3a://bucket/name"; + + private static final Path TESTPATH = new Path(TESTFILE); + + /** + * Create a OpenFileSupport instance. + */ + private static final OpenFileSupport PREPARE = + new OpenFileSupport( + CHANGE_POLICY, + READ_AHEAD_RANGE, + USERNAME, + IO_FILE_BUFFER_SIZE_DEFAULT, + DEFAULT_ASYNC_DRAIN_THRESHOLD, + INPUT_POLICY); + + @Test + public void testSimpleFile() throws Throwable { + ObjectAssert<OpenFileSupport.OpenFileInformation> + asst = assertFileInfo( + PREPARE.openSimpleFile(1024)); + + asst.extracting(f -> f.getChangePolicy()) + .isEqualTo(CHANGE_POLICY); + asst.extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + asst.extracting(f -> f.getReadAheadRange()) + .isEqualTo(READ_AHEAD_RANGE); + } + + /** + * Initiate an assert from an open file information instance. + * @param fi file info + * @return an assert stream. + */ + private ObjectAssert<OpenFileSupport.OpenFileInformation> assertFileInfo( + final OpenFileSupport.OpenFileInformation fi) { + return Assertions.assertThat(fi) + .describedAs("File Information %s", fi); + } + + /** + * Create an assertion about the openFile information from a configuration + * with the given key/value option. + * @param key key to set. + * @param option option value. + * @return the constructed OpenFileInformation. + */ + public ObjectAssert<OpenFileSupport.OpenFileInformation> assertOpenFile( + final String key, + final String option) throws IOException { + return assertFileInfo(prepareToOpenFile(params(key, option))); + } + + @Test + public void testUnknownMandatoryOption() throws Throwable { + + String key = "unknown"; + intercept(IllegalArgumentException.class, key, () -> + prepareToOpenFile(params(key, "undefined"))); + } + + @Test + public void testSeekRandomIOPolicy() throws Throwable { + + // ask for random IO + String option = FS_OPTION_OPENFILE_READ_POLICY_RANDOM; + + // is picked up + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + // and as neither status nor length was set: no file status + assertOpenFile(INPUT_FADVISE, option) + .extracting(f -> f.getStatus()) + .isNull(); + } + + /** + * There's a standard policy name. 'adaptive', + * meaning 'whatever this stream does to adapt to the client's use'. + * On the S3A connector that is mapped to {@link S3AInputPolicy#Normal}. + */ + @Test + public void testSeekPolicyAdaptive() throws Throwable { + + // when caller asks for adaptive, they get "normal" + assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE) + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Normal); + } + + /** + * Verify that an unknown seek policy falls back to + * {@link S3AInputPolicy#Normal}. Delete + */ + @Test + public void testUnknownSeekPolicyS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "undefined") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(INPUT_POLICY); + } + + /** + * The S3A option also supports a list of values. + */ + @Test + public void testSeekPolicyListS3AOption() throws Throwable { + // fall back to the normal seek policy. + assertOpenFile(INPUT_FADVISE, "hbase, random") + .extracting(f -> f.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Random); + } + + /** + * Verify that if a list of policies is supplied in a configuration, + * the first recognized policy will be adopted. + */ + @Test + public void testSeekPolicyExtractionFromList() throws Throwable { + String plist = "a, b, RandOm, other "; + Configuration conf = conf(FS_OPTION_OPENFILE_READ_POLICY, plist); + Collection<String> options = conf.getTrimmedStringCollection( + FS_OPTION_OPENFILE_READ_POLICY); + Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null)) + .describedAs("Policy from " + plist) + .isEqualTo(S3AInputPolicy.Random); + } + + @Test + public void testAdaptiveSeekPolicyRecognized() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null)) + .describedAs("adaptive") + .isEqualTo(S3AInputPolicy.Normal); + } + + @Test + public void testUnknownSeekPolicyFallback() throws Throwable { + Assertions.assertThat(S3AInputPolicy.getPolicy("unknown", null)) + .describedAs("unkown policy") Review Comment: typo: "unknown" ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.performance; + + +import java.io.EOFException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; +import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Cost of openFile(). + */ +public class ITestS3AOpenCost extends AbstractS3ACostTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3AOpenCost.class); + + private Path testFile; + + private FileStatus testFileStatus; + + private long fileLength; + + public ITestS3AOpenCost() { + super(true); + } + + /** + * Setup creates a test file, saves is status and length + * to fields. + */ + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem fs = getFileSystem(); + testFile = methodPath(); + + writeTextFile(fs, testFile, "openfile", true); + testFileStatus = fs.getFileStatus(testFile); + fileLength = testFileStatus.getLen(); + } + + /** + * Test when openFile() performs GET requests when file status + * and length options are passed down. + * Note that the input streams only update the FS statistics + * in close(), so metrics cannot be verified until all operations + * on a stream are complete. + * This is slightly less than ideal. + */ + @Test + public void testOpenFileWithStatusOfOtherFS() throws Throwable { + describe("Test cost of openFile with/without status; raw only"); + S3AFileSystem fs = getFileSystem(); + + // now read that file back in using the openFile call. + // with a new FileStatus and a different path. + // this verifies that any FileStatus class/subclass is used + // as a source of the file length. + FileStatus st2 = new FileStatus( + fileLength, false, + testFileStatus.getReplication(), + testFileStatus.getBlockSize(), + testFileStatus.getModificationTime(), + testFileStatus.getAccessTime(), + testFileStatus.getPermission(), + testFileStatus.getOwner(), + testFileStatus.getGroup(), + new Path("gopher:///localhost/" + testFile.getName())); + + // no IO in open + FSDataInputStream in = verifyMetrics(() -> + fs.openFile(testFile) + .withFileStatus(st2) + .build() + .get(), + always(NO_IO), + with(STREAM_READ_OPENED, 0)); + + // the stream gets opened during read + long readLen = verifyMetrics(() -> + readStream(in), + always(NO_IO), Review Comment: Little doubt here: When we read after opening the file, are we still on 0 IO? Issue Time Tracking ------------------- Worklog Id: (was: 755223) Time Spent: 17h 20m (was: 17h 10m) > Enhance openFile() for better read performance against object stores > --------------------------------------------------------------------- > > Key: HADOOP-16202 > URL: https://issues.apache.org/jira/browse/HADOOP-16202 > Project: Hadoop Common > Issue Type: Bug > Components: fs, fs/s3, tools/distcp > Affects Versions: 3.3.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 17h 20m > Remaining Estimate: 0h > > The {{openFile()}} builder API lets us add new options when reading a file > Add an option {{"fs.s3a.open.option.length"}} which takes a long and allows > the length of the file to be declared. If set, *no check for the existence of > the file is issued when opening the file* > Also: withFileStatus() to take any FileStatus implementation, rather than > only S3AFileStatus -and not check that the path matches the path being > opened. Needed to support viewFS-style wrapping and mounting. > and Adopt where appropriate to stop clusters with S3A reads switched to > random IO from killing download/localization > * fs shell copyToLocal > * distcp > * IOUtils.copy -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org