[
https://issues.apache.org/jira/browse/HADOOP-16202?focusedWorklogId=756378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756378
]
ASF GitHub Bot logged work on HADOOP-16202:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Apr/22 13:19
Start Date: 13/Apr/22 13:19
Worklog Time Spent: 10m
Work Description: dannycjones commented on code in PR #2584:
URL: https://github.com/apache/hadoop/pull/2584#discussion_r845064242
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java:
##########
@@ -136,56 +129,39 @@ private FutureIOSupport() {
* @param <U> type of builder
* @return the builder passed in.
*/
+ @Deprecated
Review Comment:
Why deprecate this method when other methods promoted to `FutureIO` are
happy without a deprecated flag?
Should we encourage Hadoop developers to move to `FutureIO` once promoted
from `FutureIOSupport`?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java:
##########
@@ -187,10 +192,19 @@ public B opt(@Nonnull final String key, boolean value) {
@Override
public B opt(@Nonnull final String key, int value) {
mandatoryKeys.remove(key);
+ optionalKeys.add(key);
options.setInt(key, value);
return getThisBuilder();
}
+ @Override
+ public B opt(@Nonnull final String key, final long value) {
+ mandatoryKeys.remove(key);
+ optionalKeys.add(key);
+ options.setLong(key, value);
+ return getThisBuilder();
+ }
Review Comment:
JavaDoc?
```java
/**
* Set optional long parameter for the Builder.
*
* @see #opt(String, String)
*/
```
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java:
##########
@@ -53,6 +52,7 @@ private FutureIOSupport() {
/**
* Given a future, evaluate it. Raised exceptions are
* extracted and handled.
+ * See {@link FutureIO#awaitFuture(Future, long, TimeUnit)}.
Review Comment:
I think we want to reference the `awaitFuture` with only a future as arg?
```suggestion
* See {@link FutureIO#awaitFuture(Future)}.
```
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java:
##########
@@ -70,6 +70,10 @@ public Configuration createConfiguration() {
// use minimum multipart size for faster triggering
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
+ // this is so stream draining is always blocking, allowing
+ // assertions to be safely made without worrying
+ // about any race conditions
+ conf.setInt(ASYNC_DRAIN_THRESHOLD, 128_000);
Review Comment:
Hoping to better understand why the change is needed - what did the race
conditions look like?
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java:
##########
@@ -409,22 +439,27 @@ public static boolean copy(FileSystem srcFS, FileStatus
srcStatus,
if (!dstFS.mkdirs(dst)) {
return false;
}
- FileStatus contents[] = srcFS.listStatus(src);
- for (int i = 0; i < contents.length; i++) {
- copy(srcFS, contents[i], dstFS,
- new Path(dst, contents[i].getPath().getName()),
- deleteSource, overwrite, conf);
+ RemoteIterator<FileStatus> contents = srcFS.listStatusIterator(src);
+ while (contents.hasNext()) {
+ FileStatus next = contents.next();
+ copy(srcFS, next, dstFS,
+ new Path(dst, next.getPath().getName()),
+ deleteSource, overwrite, conf);
}
} else {
- InputStream in=null;
+ InputStream in = null;
OutputStream out = null;
try {
- in = srcFS.open(src);
+ in = awaitFuture(srcFS.openFile(src)
+ .opt(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+ .opt(FS_OPTION_OPENFILE_LENGTH,
+ srcStatus.getLen()) // file length hint for object stores
Review Comment:
When should we use `FS_OPTION_OPENFILE_LENGTH` option vs.
`.withFileStatus(status)`?
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -110,13 +170,18 @@ custom subclasses.
This is critical to ensure safe use of the feature: directory listing/
status serialization/deserialization can result result in the
`withFileStatus()`
-argumennt not being the custom subclass returned by the Filesystem instance's
+argument not being the custom subclass returned by the Filesystem instance's
own `getFileStatus()`, `listFiles()`, `listLocatedStatus()` calls, etc.
In such a situation the implementations must:
-1. Validate the path (always).
-1. Use the status/convert to the custom type, *or* simply discard it.
+1. Verify that `status.getPath().getName()` matches the current
`path.getName()`
+ value. The rest of the path MUST NOT be validated.
+1. Use any status fields as desired -for example the file length.
+
+Even if not values of the status are used, the presence of the argument
Review Comment:
"none of the values"?
```suggestion
Even if none of the values of the status are used, the presence of the
argument
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
Review Comment:
typo: missing space
```suggestion
The inability to access/read a file MUST raise an `IOException` or subclass
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -110,13 +170,18 @@ custom subclasses.
This is critical to ensure safe use of the feature: directory listing/
status serialization/deserialization can result result in the
`withFileStatus()`
-argumennt not being the custom subclass returned by the Filesystem instance's
+argument not being the custom subclass returned by the Filesystem instance's
Review Comment:
line above, `result result`
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
Review Comment:
```suggestion
Thus, even when the file does not exist, or it's a directory rather than a
file,
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
Review Comment:
exceotions -> exceptions
```suggestion
at all. Such operations MAY NOT raise exceptions when interacting with
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option,
if
+they know their read plan, they SHOULD declare which policy is most
appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
Review Comment:
implemenations -> implementations
```suggestion
It does inform the implementations that the stream should be
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
Review Comment:
typo: access
```suggestion
| `random` | Optimize for random access. |
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+ the filename of the path, not the full path.
+ This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder`
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle,
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
+the actual `build()` and `get()` operations, this file-specific
+preconditions (file exists, file is readable, etc) MUST NOT be checked here.
+
+FileSystem implementations which do not implement `open(Path, int)`
+MAY postpone raising an `UnsupportedOperationException` until either the
+`FutureDataInputStreamBuilder.build()` or the subsequent `get()` call,
+else they MAY fail fast in the `openFile()` call.
+
+Consult [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html) for
details
+on how to use the builder, and for standard options which may be pssed in.
Review Comment:
typo: passed
```suggestion
on how to use the builder, and for standard options which may be passed in.
```
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java:
##########
@@ -104,7 +104,7 @@ public synchronized void parse(HistoryEventHandler handler)
* Only used for unit tests.
*/
@Private
- public synchronized void parse(EventReader reader, HistoryEventHandler
handler)
+ public synchronized void parse(EventReader reader, HistoryEventHandler
handler)
Review Comment:
nit: remove extra space
file doesn't appear changed if we drop it
##########
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java:
##########
@@ -62,14 +58,8 @@ public RecordReader<Text, Text>
createRecordReader(InputSplit genericSplit,
context.progress();
// Open the file and seek to the start of the split
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- // open the file
- final FutureDataInputStreamBuilder builder = fs.openFile(path);
- FutureIOSupport.propagateOptions(builder, conf,
- MRJobConfig.INPUT_FILE_OPTION_PREFIX,
- MRJobConfig.INPUT_FILE_MANDATORY_PREFIX);
- FSDataInputStream in = FutureIOSupport.awaitFuture(builder.build());
+ FileSystem fs = split.getPath().getFileSystem(conf);
+ FSDataInputStream in = fs.open(split.getPath());
Review Comment:
Why are we changing this to use `FileSystem#open`?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -333,7 +341,7 @@ private void seekInStream(long targetPos, long length)
throws IOException {
streamStatistics.seekBackwards(diff);
// if the stream is in "Normal" mode, switch to random IO at this
// point, as it is indicative of columnar format IO
- if (inputPolicy.equals(S3AInputPolicy.Normal)) {
+ if (inputPolicy.isAdaptive()) {
LOG.info("Switching to Random IO seek policy");
setInputPolicy(S3AInputPolicy.Random);
}
Review Comment:
Update comment above as it's still referencing 'Normal' mode`?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java:
##########
@@ -296,6 +304,15 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream",
TYPE_COUNTER),
+ STREAM_READ_REMOTE_STREAM_ABORTED(
+ StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
+ "Count/duration of aborting a remote stream during stream IO",
+ TYPE_DURATION),
+ STREAM_READ_REMOTE_STREAM_CLOSED(
+ StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
+ "Count/duration of closing a remote stream during stream IO",
+ TYPE_DURATION),
Review Comment:
They are always durations, never count?
```suggestion
STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Duration of aborting a remote stream during stream IO",
TYPE_DURATION),
STREAM_READ_REMOTE_STREAM_CLOSED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
"Duration of closing a remote stream during stream IO",
TYPE_DURATION),
```
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java:
##########
@@ -18,59 +18,110 @@
package org.apache.hadoop.fs.s3a;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Locale;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Locale;
-import static org.apache.hadoop.fs.s3a.Constants.*;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
/**
- * Filesystem input policy.
+ * Stream input policy.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public enum S3AInputPolicy {
- Normal(INPUT_FADV_NORMAL),
- Sequential(INPUT_FADV_SEQUENTIAL),
- Random(INPUT_FADV_RANDOM);
+ Normal(FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, false, true),
+ Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, false, false),
+ Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, true, false);
Review Comment:
I think the second arg was flipped but not updated.
```suggestion
Random(FS_OPTION_OPENFILE_READ_POLICY_RANDOM, true, false),
Sequential(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, false, false);
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option,
if
+they know their read plan, they SHOULD declare which policy is most
appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### <a name="read.policy.whole-file"></a> Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system
client is free to enable
+whatever strategies maximise performance for this. In particular, larger
ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a
connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened
stream SHOULD declare this
+read policy.
+
+### <a name="openfile.length"></a> Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+ this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position
across/beyond
+ this length but below the actual length of the file. Implementations MAY
+ raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* If a file status is supplied along with a value in `fs.opt.openfile.length`;
+ the file status values take precedence.
+
+### <a name="split.start"></a> Options: `fs.option.openfile.` and
`fs.option.openfile.split.end`
+
+Declare the start and end of the split when a file has been split for
processing
+in pieces.
+
+1. If a value is negative, the option SHALL be considered unset.
+1. Filesystems MAY assume that the length of the file is greater than or equal
+ to the value of `fs.option.openfile.split.end`.
+1. And that they MAY raise an exception if the client application reads past
the
+ value set in `fs.option.openfile.split.end`.
+1. The pair of options MAY be used to optimise the read plan, such as setting
+ the content range for GET requests, or using the split end as an implicit
+ declaration of the guaranteed minimum length of the file.
+1. If both options are set, and the split start is declared as greater than the
+ split end, then the split start SHOULD just be reset to zero, rather than
+ rejecting the operation.
+
+The split end value can provide a hint as to the end of the input stream. The
+split start can be used to optimize any initial read offset for filesystem
+clients.
+
+*Note for implementors: applications will read past the end of a split when
they
+need to read to the end of a record/line which begins before the end of the
+split.
+
+Therefore clients MUST be allowed to `seek()`/`read()` past the length
+set in `fs.option.openfile.split.end` if the file is actually longer
+than that value.
+
+## <a name="s3a"></a> S3A-specific options
+
+The S3A Connector supports custom options for readahead and seek policy.
+
+| Name | Type | Meaning
|
+|--------------------------------------|----------|-------------------------------------------------------------|
+| `fs.s3a.readahead.range` | `long` | readahead range in bytes
|
+| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to
asynchronous draining of the stream |
+| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by
`fs.option.openfile.read.policy` |
+
+If the option set contains a SQL statement in the `fs.s3a.select.sql`
statement,
+then the file is opened as an S3 Select query.
+Consult the S3A documentation for more details.
+
+## <a name="abfs"></a> ABFS-specific options
+
+The ABFS Connector supports custom input stream options.
+
+| Name | Type | Meaning
|
+|-----------------------------------|-----------|----------------------------------------------------|
+| `fs.azure.buffered.pread.disable` | `boolean` | disable caching on the
positioned read operations. |
+
+
+Disables caching on data read through the
[PositionedReadable](fsdatainputstream.html#PositionedReadable)
+APIs.
+
+Consult the ABFS Documentation for more details.
+
+## <a name="examples"></a> Examples
+
+#### Declaring seek policy and split limits when opening a file.
+
+Here is an example from a proof of
+concept `org.apache.parquet.hadoop.util.HadoopInputFile`
+reader which uses a (nullable) file status and a split start/end.
+
+The `FileStatus` value is always passed in -but if it is null, then the split
+end is used to declare the length of the file.
+
+```java
+protected SeekableInputStream newStream(Path path, FileStatus stat,
+ long splitStart, long splitEnd)
+ throws IOException {
+
+ FutureDataInputStreamBuilder builder = fs.openFile(path)
+ .opt("fs.option.openfile.read.policy", "vector, random")
+ .withFileStatus(stat);
+
+ builder.opt("fs.option.openfile.split.start", splitStart);
+ builder.opt("fs.option.openfile.split.end", splitEnd);
+ CompletableFuture<FSDataInputStream> streamF = builder.build();
+ return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
+}
```
-The preconditions for opening the file are checked during the asynchronous
-evaluation, and so will surface when the future is completed:
+As a result, whether driven directly by a file listing, or when opening a file
+from a query plan of `(path, splitStart, splitEnd)`, there is no need to probe
+the remote store for the length of the file. When working with remote object
+stores, this can save tens to hundreds of milliseconds, even if such a probe is
+done asynchronously.
+
+If both the file length and the split end is set, then the file length MUST be
+considered "more" authoritative, that is it really SHOULD be defining the file
+length. If the split end is set, the caller MAY ot read past it.
+
+The `CompressedSplitLineReader` can read past the end of a split if it is
+partway through processing a compressed record. That is: it assumes an
+incomplete record read means that the file length is greater than the split
+length, and that it MUST read the entirety of the partially read record. Other
+readers may behave similarly.
+
+Therefore
+
+1. File length as supplied in a `FileStatus` or in `fs.option.openfile.length`
+ SHALL set the strict upper limit on the length of a file
+2. The split end as set in `fs.option.openfile.split.end` MUST be viewed as a
+ hint, rather than the strict end of the file.
+
+### Opening a file with both standard and non-standard options
+
+Standard and non-standard options MAY be combined in the same `openFile()`
+operation.
+
+```java
+Future<FSDataInputStream> f = openFile(path)
+ .must("fs.option.openfile.read.policy", "random, adaptive")
+ .opt("fs.s3a.readahead.range", 1024 * 1024)
+ .build();
+
+FSDataInputStream is = f.get();
+```
+
+The option set in `must()` MUST be understood, or at least recognized and
+ignored by all filesystems. In this example, S3A-specific option MAY be
+ignored by all other filesystem clients.
+
+### Opening a file with older releases
+
+Not all hadoop releases recognize the `fs.option.openfile.read.policy` option.
+
+The option can be safely used in application code if it is added via the
`opt()`
+builder argument, as it will be treated as an unknown optional key which can
+then be discarded.
+
+```java
+Future<FSDataInputStream> f = openFile(path)
+ .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
+ .build();
+
+FSDataInputStream is = f.get();
+```
+
+*Note 1* if the option name is set by a reference to a constant in
+`org.apache.hadoop.fs.Options.OpenFileOptions`, then the program will not link
+against versions of Hadoop without the specific option. Therefore for resilient
+linking against older releases -use a copy of the value.
+
+*Note 2* as option validation is performed in the FileSystem connector,
+a third-party connector designed to work with multiple hadoop versions
+MAY NOT support the option.
+
+### Passing options in to MapReduce
+
+Hadoop MapReduce will automatically read MR Job Options with the prefixes
+`mapreduce.job.input.file.option.` and `mapreduce.job.input.file.must.`
+prefixes, and apply these values as `.opt()` and `must()` respectively, after
+remove the mapreduce-specific prefixes.
+
+This makes passing options in to MR jobs straightforward. For example, to
+declare that a job should read its data using random IO:
+
+```java
+JobConf jobConf = (JobConf) job.getConfiguration()
+jobConf.set(
+ "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
+ "random");
+```
+
+### MapReduce input format propagating options
+
+An example of a record reader passing in options to the file it opens.
```java
-FSDataInputStream in = future.get();
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ FileSplit split = (FileSplit)genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+
+ // open the file and seek to the start of the split
+ FutureDataInputStreamBuilder builder =
+ file.getFileSystem(job).openFile(file);
+ // the start and end of the split may be used to build
+ // an input strategy.
+ builder.opt("fs.option.openfile.split.start", start);
+ builder.opt("fs.option.openfile.split.end", end);
+ FutureIO.propagateOptions(builder, job,
+ "mapreduce.job.input.file.option",
+ "mapreduce.job.input.file.must");
+
+ fileIn = FutureIO.awaitFuture(builder.build());
+ fileIn.seek(start)
+ /* Rest of the operation on the opened stream */
+ }
```
+
+### `FileContext.openFile`
+
+From `org.apache.hadoop.fs.AvroFSInput`; a file is opened with sequential
input.
+Because the file length has already been probed for, the length is passd down
Review Comment:
passd -> passed
```suggestion
Because the file length has already been probed for, the length is passed
down.
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/openfile.md:
##########
@@ -0,0 +1,122 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# `FileSystem.openFile()`/`FileContext.openFile()`
+
+This is a method provided by both FileSystem and FileContext for
+advanced file opening options and, where implemented,
+an asynchrounous/lazy opening of a file.
+
+Creates a builder to open a file, supporting options
+both standard and filesystem specific. The return
+value of the `build()` call is a `Future<FSDataInputStream>`,
+which must be waited on. The file opening may be
+asynchronous, and it may actually be postponed (including
+permission/existence checks) until reads are actually
+performed.
+
+This API call was added to `FileSystem` and `FileContext` in
+Hadoop 3.3.0; it was tuned in Hadoop 3.3.1 as follows.
+
+* Added `opt(key, long)` and `must(key, long)`.
+* Declared that `withFileStatus(null)` is allowed.
+* Declared that `withFileStatus(status)` only checks
+ the filename of the path, not the full path.
+ This is needed to support passthrough/mounted filesystems.
+* Added standard option keys.
+
+### <a name="openfile_path_"></a> `FutureDataInputStreamBuilder openFile(Path
path)`
+
+Creates a [`FutureDataInputStreamBuilder`](fsdatainputstreambuilder.html)
+to construct a operation to open the file at `path` for reading.
+
+When `build()` is invoked on the returned `FutureDataInputStreamBuilder`
instance,
+the builder parameters are verified and
+`FileSystem.openFileWithOptions(Path, OpenFileParameters)` or
+`AbstractFileSystem.openFileWithOptions(Path, OpenFileParameters)` invoked.
+
+These protected methods returns a `CompletableFuture<FSDataInputStream>`
+which, when its `get()` method is called, either returns an input
+stream of the contents of opened file, or raises an exception.
+
+The base implementation of the `FileSystem.openFileWithOptions(PathHandle,
OpenFileParameters)`
+ultimately invokes `FileSystem.open(Path, int)`.
+
+Thus the chain `FileSystem.openFile(path).build().get()` has the same
preconditions
+and postconditions as `FileSystem.open(Path p, int bufferSize)`
+
+However, there is one difference which implementations are free to
+take advantage of:
+
+The returned stream MAY implement a lazy open where file non-existence or
+access permission failures may not surface until the first `read()` of the
+actual data.
+
+This saves network IO on object stores.
+
+The `openFile()` operation MAY check the state of the filesystem during its
+invocation, but as the state of the filesystem may change betwen this call and
Review Comment:
```suggestion
invocation, but as the state of the filesystem may change between this call
and
```
Was already misspelt in source
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+ private Path testFile;
+
+ private FileStatus testFileStatus;
+
+ private long fileLength;
+
+ public ITestS3AOpenCost() {
+ super(true);
+ }
+
+ /**
+ * Setup creates a test file, saves is status and length
+ * to fields.
+ */
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ S3AFileSystem fs = getFileSystem();
+ testFile = methodPath();
+
+ writeTextFile(fs, testFile, "openfile", true);
+ testFileStatus = fs.getFileStatus(testFile);
+ fileLength = testFileStatus.getLen();
+ }
+
+ /**
+ * Test when openFile() performs GET requests when file status
+ * and length options are passed down.
+ * Note that the input streams only update the FS statistics
+ * in close(), so metrics cannot be verified until all operations
+ * on a stream are complete.
+ * This is slightly less than ideal.
+ */
+ @Test
+ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+ describe("Test cost of openFile with/without status; raw only");
+ S3AFileSystem fs = getFileSystem();
+
+ // now read that file back in using the openFile call.
+ // with a new FileStatus and a different path.
+ // this verifies that any FileStatus class/subclass is used
+ // as a source of the file length.
+ FileStatus st2 = new FileStatus(
+ fileLength, false,
+ testFileStatus.getReplication(),
+ testFileStatus.getBlockSize(),
+ testFileStatus.getModificationTime(),
+ testFileStatus.getAccessTime(),
+ testFileStatus.getPermission(),
+ testFileStatus.getOwner(),
+ testFileStatus.getGroup(),
+ new Path("gopher:///localhost/" + testFile.getName()));
+
+ // no IO in open
+ FSDataInputStream in = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .withFileStatus(st2)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // the stream gets opened during read
+ long readLen = verifyMetrics(() ->
+ readStream(in),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 1));
+ assertEquals("bytes read from file", fileLength, readLen);
+ }
+
+ @Test
+ public void testOpenFileShorterLength() throws Throwable {
+ // do a second read with the length declared as short.
+ // we now expect the bytes read to be shorter.
+ S3AFileSystem fs = getFileSystem();
+
+ S3ATestUtils.MetricDiff bytesDiscarded =
+ new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
+ int offset = 2;
+ long shortLen = fileLength - offset;
+ // open the file
+ FSDataInputStream in2 = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .must(FS_OPTION_OPENFILE_READ_POLICY,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+ .opt(FS_OPTION_OPENFILE_LENGTH, shortLen)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // verify that the statistics are in range
+ IOStatistics ioStatistics = extractStatistics(in2);
+ Object statsString = demandStringifyIOStatistics(ioStatistics);
+ LOG.info("Statistics of open stream {}", statsString);
+ verifyStatisticCounterValue(ioStatistics, ACTION_FILE_OPENED, 1);
+ // no network IO happened, duration is 0. There's a very small
+ // risk of some delay making it positive just from scheduling delays
+ assertDurationRange(ioStatistics, ACTION_FILE_OPENED, 0, 0);
+ // now read it
+ long r2 = verifyMetrics(() ->
+ readStream(in2),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 1),
+ with(STREAM_READ_BYTES_READ_CLOSE, 0),
+ with(STREAM_READ_SEEK_BYTES_SKIPPED, 0));
+
+ LOG.info("Statistics of read stream {}", statsString);
+
+ assertEquals("bytes read from file", shortLen, r2);
+ // the read has been ranged
+ bytesDiscarded.assertDiffEquals(0);
+ }
+
+ @Test
+ public void testOpenFileLongerLength() throws Throwable {
+ // do a second read with the length declared as short.
+ // we now expect the bytes read to be shorter.
Review Comment:
Comment needs updating for this test?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4868,35 +4874,31 @@ private void requireSelectSupport(final Path source)
throws
* Extract the status from the optional parameter, querying
* S3 if it is absent.
* @param path path of the status
- * @param optStatus optional status
+ * @param fileInformation information on the file to open
* @return a file status
* @throws FileNotFoundException if there is no normal file at that path
* @throws IOException IO failure
*/
private S3AFileStatus extractOrFetchSimpleFileStatus(
- final Path path, final Optional<S3AFileStatus> optStatus)
+ final Path path,
+ final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
- S3AFileStatus fileStatus;
- if (optStatus.isPresent()) {
- fileStatus = optStatus.get();
+ S3AFileStatus fileStatus = fileInformation.getStatus();
+ if (fileStatus == null) {
// we check here for the passed in status
// being a directory
- if (fileStatus.isDirectory()) {
- throw new FileNotFoundException(path.toString() + " is a directory");
- }
- } else {
Review Comment:
This comment is no longer accurate, I think? It belongs with line 4893 (or
we can drop the comment).
```
// we check here for the passed in status
// being a directory
```
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+
+import java.io.EOFException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
+import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_IO;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Cost of openFile().
+ */
+public class ITestS3AOpenCost extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3AOpenCost.class);
+
+ private Path testFile;
+
+ private FileStatus testFileStatus;
+
+ private long fileLength;
+
+ public ITestS3AOpenCost() {
+ super(true);
+ }
+
+ /**
+ * Setup creates a test file, saves is status and length
+ * to fields.
+ */
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ S3AFileSystem fs = getFileSystem();
+ testFile = methodPath();
+
+ writeTextFile(fs, testFile, "openfile", true);
+ testFileStatus = fs.getFileStatus(testFile);
+ fileLength = testFileStatus.getLen();
+ }
+
+ /**
+ * Test when openFile() performs GET requests when file status
+ * and length options are passed down.
+ * Note that the input streams only update the FS statistics
+ * in close(), so metrics cannot be verified until all operations
+ * on a stream are complete.
+ * This is slightly less than ideal.
+ */
+ @Test
+ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
+ describe("Test cost of openFile with/without status; raw only");
+ S3AFileSystem fs = getFileSystem();
+
+ // now read that file back in using the openFile call.
+ // with a new FileStatus and a different path.
+ // this verifies that any FileStatus class/subclass is used
+ // as a source of the file length.
+ FileStatus st2 = new FileStatus(
+ fileLength, false,
+ testFileStatus.getReplication(),
+ testFileStatus.getBlockSize(),
+ testFileStatus.getModificationTime(),
+ testFileStatus.getAccessTime(),
+ testFileStatus.getPermission(),
+ testFileStatus.getOwner(),
+ testFileStatus.getGroup(),
+ new Path("gopher:///localhost/" + testFile.getName()));
+
+ // no IO in open
+ FSDataInputStream in = verifyMetrics(() ->
+ fs.openFile(testFile)
+ .withFileStatus(st2)
+ .build()
+ .get(),
+ always(NO_IO),
+ with(STREAM_READ_OPENED, 0));
+
+ // the stream gets opened during read
+ long readLen = verifyMetrics(() ->
+ readStream(in),
+ always(NO_IO),
Review Comment:
I didn't understand here - what do we mean by `NO_IO`? We are reading all of
the stream, right?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OpenFileSupport.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
+import org.apache.hadoop.fs.s3a.select.SelectConstants;
+
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
+import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
+import static
org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Helper class for openFile() logic, especially processing file status
+ * args and length/etag/versionID.
+ * <p>
+ * This got complex enough it merited removal from S3AFileSystem -which
+ * also permits unit testing.
+ * </p>
+ * <p>
+ * The default values are those from the FileSystem configuration.
+ * in openFile(), they can all be changed by specific options;
+ * in FileSystem.open(path, buffersize) only the buffer size is
+ * set.
+ * </p>
+ */
+public class OpenFileSupport {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OpenFileSupport.class);
+
+ /**
+ * For use when a value of an split/file length is unknown.
+ */
+ private static final int LENGTH_UNKNOWN = -1;
+
+ /** Default change detection policy. */
+ private final ChangeDetectionPolicy changePolicy;
+
+ /** Default read ahead range. */
+ private final long defaultReadAhead;
+
+ /** Username. */
+ private final String username;
+
+ /** Default buffer size. */
+ private final int defaultBufferSize;
+
+ /**
+ * Threshold for stream reads to switch to
+ * asynchronous draining.
+ */
+ private final long defaultAsyncDrainThreshold;
+
+ /**
+ * Default input policy; may be overridden in
+ * {@code openFile()}
+ */
+ private final S3AInputPolicy defaultInputPolicy;
+
+ /**
+ * Instantiate with the default options from the filesystem.
+ * @param changePolicy change detection policy
+ * @param defaultReadAhead read ahead range
+ * @param username username
+ * @param defaultBufferSize buffer size
+ * @param defaultAsyncDrainThreshold drain threshold
+ * @param defaultInputPolicy input policy
+ */
+ public OpenFileSupport(
+ final ChangeDetectionPolicy changePolicy,
+ final long defaultReadAhead,
+ final String username,
+ final int defaultBufferSize,
+ final long defaultAsyncDrainThreshold,
+ final S3AInputPolicy defaultInputPolicy) {
+ this.changePolicy = changePolicy;
+ this.defaultReadAhead = defaultReadAhead;
+ this.username = username;
+ this.defaultBufferSize = defaultBufferSize;
+ this.defaultAsyncDrainThreshold = defaultAsyncDrainThreshold;
+ this.defaultInputPolicy = defaultInputPolicy;
+ }
+
+ public ChangeDetectionPolicy getChangePolicy() {
+ return changePolicy;
+ }
+
+ public long getDefaultReadAhead() {
+ return defaultReadAhead;
+ }
+
+ public int getDefaultBufferSize() {
+ return defaultBufferSize;
+ }
+
+ public long getDefaultAsyncDrainThreshold() {
+ return defaultAsyncDrainThreshold;
+ }
+
+ /**
+ * Propagate the default options to the operation context
+ * being built up.
+ * @param roc context
+ * @return the context
+ */
+ public S3AReadOpContext applyDefaultOptions(S3AReadOpContext roc) {
+ return roc
+ .withInputPolicy(defaultInputPolicy)
+ .withChangeDetectionPolicy(changePolicy)
+ .withAsyncDrainThreshold(defaultAsyncDrainThreshold)
+ .withReadahead(defaultReadAhead);
+ }
+
+ /**
+ * Prepare to open a file from the openFile parameters.
+ * @param path path to the file
+ * @param parameters open file parameters from the builder.
+ * @param blockSize for fileStatus
+ * @return open file options
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ */
+ @SuppressWarnings("ChainOfInstanceofChecks")
+ public OpenFileInformation prepareToOpenFile(
+ final Path path,
+ final OpenFileParameters parameters,
+ final long blockSize) throws IOException {
+ Configuration options = parameters.getOptions();
+ Set<String> mandatoryKeys = parameters.getMandatoryKeys();
+ String sql = options.get(SelectConstants.SELECT_SQL, null);
+ boolean isSelect = sql != null;
+ // choice of keys depends on open type
+ if (isSelect) {
+ // S3 Select call adds a large set of supported mandatory keys
+ rejectUnknownMandatoryKeys(
+ mandatoryKeys,
+ InternalSelectConstants.SELECT_OPTIONS,
+ "for " + path + " in S3 Select operation");
+ } else {
+ rejectUnknownMandatoryKeys(
+ mandatoryKeys,
+ InternalConstants.S3A_OPENFILE_KEYS,
+ "for " + path + " in non-select file I/O");
+ }
+
+ // where does a read end?
+ long fileLength = LENGTH_UNKNOWN;
+
+ // was a status passed in via a withStatus() invocation in
+ // the builder API?
+ FileStatus providedStatus = parameters.getStatus();
+ S3AFileStatus fileStatus = null;
+ if (providedStatus != null) {
+ // there's a file status
+
+ // make sure the file name matches -the rest of the path
+ // MUST NOT be checked.
+ Path providedStatusPath = providedStatus.getPath();
+ checkArgument(path.getName().equals(providedStatusPath.getName()),
+ "Filename mismatch between file being opened %s and"
+ + " supplied filestatus %s",
+ path, providedStatusPath);
+
+ // make sure the status references a file
+ if (providedStatus.isDirectory()) {
+ throw new FileNotFoundException(
+ "Supplied status references a directory " + providedStatus);
+ }
+ // build up the values
+ long len = providedStatus.getLen();
+ long modTime = providedStatus.getModificationTime();
+ String versionId;
+ String eTag;
+ // can use this status to skip our own probes,
+ LOG.debug("File was opened with a supplied FileStatus;"
+ + " skipping getFileStatus call in open() operation: {}",
+ providedStatus);
+
+ // what type is the status (and hence: what information does it contain?)
+ if (providedStatus instanceof S3AFileStatus) {
+ // is it an S3AFileSystem status?
+ S3AFileStatus st = (S3AFileStatus) providedStatus;
+ versionId = st.getVersionId();
+ eTag = st.getEtag();
+ } else if (providedStatus instanceof S3ALocatedFileStatus) {
+
+ // S3ALocatedFileStatus instance may supply etag and version.
+ S3ALocatedFileStatus st = (S3ALocatedFileStatus) providedStatus;
+ versionId = st.getVersionId();
+ eTag = st.getEtag();
+ } else {
+ // it is another type.
+ // build a status struct without etag or version.
+ LOG.debug("Converting file status {}", providedStatus);
+ versionId = null;
+ eTag = null;
+ }
+ // Construct a new file status with the real path of the file.
+ fileStatus = new S3AFileStatus(
+ len,
+ modTime,
+ path,
+ blockSize,
+ username,
+ eTag,
+ versionId);
+ // set the end of the read to the file length
+ fileLength = fileStatus.getLen();
+ }
+ // determine start and end of file.
+ long splitStart = options.getLong(FS_OPTION_OPENFILE_SPLIT_START, 0);
+
+ // split end
+ long splitEnd = options.getLong(FS_OPTION_OPENFILE_SPLIT_END,
+ LENGTH_UNKNOWN);
+ if (splitStart > 0 && splitStart > splitEnd) {
+ LOG.warn("Split start {} is greater than split end {}, resetting",
+ splitStart, splitEnd);
+ splitStart = 0;
+ }
+
+ // read end is the open file value
+ fileLength = options.getLong(FS_OPTION_OPENFILE_LENGTH, fileLength);
+
+ // if the read end has come from options, use that
+ // in creating a file status
+ if (fileLength >= 0 && fileStatus == null) {
+ fileStatus = createStatus(path, fileLength, blockSize);
+ }
+
+ // Build up the input policy.
+ // seek policy from default, s3a opt or standard option
+ // read from the FS standard option.
+ Collection<String> policies =
+ options.getStringCollection(FS_OPTION_OPENFILE_READ_POLICY);
+ if (policies.isEmpty()) {
+ // fall back to looking at the S3A-specific option.
+ policies = options.getStringCollection(INPUT_FADVISE);
+ }
+
+ return new OpenFileInformation()
+ .withS3Select(isSelect)
+ .withSql(sql)
+ .withAsyncDrainThreshold(
+ options.getLong(ASYNC_DRAIN_THRESHOLD,
+ defaultReadAhead))
Review Comment:
Default should be `defaultAsyncDrainThreshold`, not `defaultReadAhead`?
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
Review Comment:
double `the`
alternatively...
```suggestion
means that it is not a requirement for the stores to actually read and use
the read
policy or file length values when opening files.
```
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -78,6 +84,11 @@ public class S3AInputStream extends FSInputStream implements
CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";
+ /**
+ * size of a buffer to create when draining the stream.
+ */
+ private static final int DRAIN_BUFFER_SIZE = 16384;
Review Comment:
Why `16384` or `2^14`? What's the significance?
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
Review Comment:
This might be a little clearer?
```suggestion
If set as an `opt()` parameter, both unsupported and unrecognized "standard"
options MUST be ignored.
```
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -602,37 +603,60 @@ private Constants() {
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
+ /**
+ * The threshold at which drain operations switch
+ * to being asynchronous with the schedule/wait overhead
+ * compared to synchronous.
+ * Value: {@value}
+ */
+ public static final String ASYNC_DRAIN_THRESHOLD =
"fs.s3a.input.async.drain.threshold";
+
+ /**
+ * This is a number based purely on experimentation in
+ * {@code ITestS3AInputStreamPerformance}.
+ * Value: {@value}
+ */
+ public static final int DEFAULT_ASYNC_DRAIN_THRESHOLD = 16_000;
+
/**
* Which input strategy to use for buffering, seeking and similar when
* reading data.
* Value: {@value}
*/
- @InterfaceStability.Unstable
public static final String INPUT_FADVISE =
"fs.s3a.experimental.input.fadvise";
+ /**
+ * The default value for this FS.
+ * Which for S3A, is adaptive.
+ * Value: {@value}
+ */
+ public static final String INPUT_FADV_DEFAULT =
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
+
/**
* General input. Some seeks, some reads.
+ * The policy name "default" is standard across different stores,
+ * and should be preferred.
* Value: {@value}
*/
- @InterfaceStability.Unstable
public static final String INPUT_FADV_NORMAL = "normal";
/**
* Optimized for sequential access.
* Value: {@value}
*/
- @InterfaceStability.Unstable
- public static final String INPUT_FADV_SEQUENTIAL = "sequential";
+ public static final String INPUT_FADV_SEQUENTIAL =
+ Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
Review Comment:
Should we explicitly deprecate in favour of using the Hadoop Common
constants?
Also applies to `INPUT_FADV_RANDOM`.
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
Review Comment:
expensive?
```suggestion
sequential to random seek policies may be expensive.
```
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option,
if
+they know their read plan, they SHOULD declare which policy is most
appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### <a name="read.policy.whole-file"></a> Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system
client is free to enable
+whatever strategies maximise performance for this. In particular, larger
ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a
connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened
stream SHOULD declare this
+read policy.
+
+### <a name="openfile.length"></a> Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+ this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position
across/beyond
+ this length but below the actual length of the file. Implementations MAY
+ raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* If a file status is supplied along with a value in `fs.opt.openfile.length`;
+ the file status values take precedence.
+
+### <a name="split.start"></a> Options: `fs.option.openfile.` and
`fs.option.openfile.split.end`
+
+Declare the start and end of the split when a file has been split for
processing
+in pieces.
+
+1. If a value is negative, the option SHALL be considered unset.
+1. Filesystems MAY assume that the length of the file is greater than or equal
+ to the value of `fs.option.openfile.split.end`.
+1. And that they MAY raise an exception if the client application reads past
the
+ value set in `fs.option.openfile.split.end`.
+1. The pair of options MAY be used to optimise the read plan, such as setting
+ the content range for GET requests, or using the split end as an implicit
+ declaration of the guaranteed minimum length of the file.
+1. If both options are set, and the split start is declared as greater than the
+ split end, then the split start SHOULD just be reset to zero, rather than
+ rejecting the operation.
+
+The split end value can provide a hint as to the end of the input stream. The
+split start can be used to optimize any initial read offset for filesystem
+clients.
+
+*Note for implementors: applications will read past the end of a split when
they
+need to read to the end of a record/line which begins before the end of the
+split.
+
+Therefore clients MUST be allowed to `seek()`/`read()` past the length
+set in `fs.option.openfile.split.end` if the file is actually longer
+than that value.
+
+## <a name="s3a"></a> S3A-specific options
+
+The S3A Connector supports custom options for readahead and seek policy.
+
+| Name | Type | Meaning
|
+|--------------------------------------|----------|-------------------------------------------------------------|
+| `fs.s3a.readahead.range` | `long` | readahead range in bytes
|
+| `fs.s3a.input.async.drain.threshold` | `long` | threshold to switch to
asynchronous draining of the stream |
+| `fs.s3a.experimental.input.fadvise` | `String` | seek policy. Superceded by
`fs.option.openfile.read.policy` |
+
+If the option set contains a SQL statement in the `fs.s3a.select.sql`
statement,
+then the file is opened as an S3 Select query.
+Consult the S3A documentation for more details.
+
+## <a name="abfs"></a> ABFS-specific options
+
+The ABFS Connector supports custom input stream options.
+
+| Name | Type | Meaning
|
+|-----------------------------------|-----------|----------------------------------------------------|
+| `fs.azure.buffered.pread.disable` | `boolean` | disable caching on the
positioned read operations. |
+
+
+Disables caching on data read through the
[PositionedReadable](fsdatainputstream.html#PositionedReadable)
+APIs.
+
+Consult the ABFS Documentation for more details.
+
+## <a name="examples"></a> Examples
+
+#### Declaring seek policy and split limits when opening a file.
+
+Here is an example from a proof of
+concept `org.apache.parquet.hadoop.util.HadoopInputFile`
+reader which uses a (nullable) file status and a split start/end.
+
+The `FileStatus` value is always passed in -but if it is null, then the split
+end is used to declare the length of the file.
+
+```java
+protected SeekableInputStream newStream(Path path, FileStatus stat,
+ long splitStart, long splitEnd)
+ throws IOException {
+
+ FutureDataInputStreamBuilder builder = fs.openFile(path)
+ .opt("fs.option.openfile.read.policy", "vector, random")
+ .withFileStatus(stat);
+
+ builder.opt("fs.option.openfile.split.start", splitStart);
+ builder.opt("fs.option.openfile.split.end", splitEnd);
+ CompletableFuture<FSDataInputStream> streamF = builder.build();
+ return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
+}
```
-The preconditions for opening the file are checked during the asynchronous
-evaluation, and so will surface when the future is completed:
+As a result, whether driven directly by a file listing, or when opening a file
+from a query plan of `(path, splitStart, splitEnd)`, there is no need to probe
+the remote store for the length of the file. When working with remote object
+stores, this can save tens to hundreds of milliseconds, even if such a probe is
+done asynchronously.
+
+If both the file length and the split end is set, then the file length MUST be
+considered "more" authoritative, that is it really SHOULD be defining the file
+length. If the split end is set, the caller MAY ot read past it.
Review Comment:
typo: ot?
Should it be dropped?
```suggestion
If both the file length and the split end is set, then the file length MUST
be
considered "more" authoritative, that is it really SHOULD be defining the
file
length. If the split end is set, the caller MAY read past it.
```
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -4085,9 +4089,7 @@ private void createEmptyObject(final String objectName)
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time.
- * @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
- @Deprecated
Review Comment:
Is this method missing an `@Override` tag?
`FileSystem#getDefaultBlockSize()` is deprecated
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option,
if
+they know their read plan, they SHOULD declare which policy is most
appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### <a name="read.policy.whole-file"></a> Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system
client is free to enable
+whatever strategies maximise performance for this. In particular, larger
ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a
connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened
stream SHOULD declare this
+read policy.
+
+### <a name="openfile.length"></a> Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+ this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position
across/beyond
+ this length but below the actual length of the file. Implementations MAY
+ raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` < 0 MUST be rejected.
Review Comment:
Implementations are recommended to reject < 0, but above we consider the
option unset if negative.
If a negative length is supplied, should users expect it to be rejected or
ignored?
##########
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md:
##########
@@ -128,26 +193,499 @@ completed, returns an input stream which can read data
from the filesystem.
The `build()` operation MAY perform the validation of the file's existence,
its kind, so rejecting attempts to read from a directory or non-existent
-file. **Alternatively**, the `build()` operation may delay all checks
-until an asynchronous operation whose outcome is provided by the `Future`
+file. Alternatively
+* file existence/status checks MAY be performed asynchronously within the
returned
+ `CompletableFuture<>`.
+* file existence/status checks MAY be postponed until the first byte is read in
+ any of the read such as `read()` or `PositionedRead`.
That is, the precondition `exists(FS, path)` and `isFile(FS, path)` are
-only guaranteed to have been met after the `get()` on the returned future is
successful.
+only guaranteed to have been met after the `get()` called on returned future
+and an attempt has been made to read the stream.
-Thus, if even a file does not exist, the following call will still succeed,
returning
-a future to be evaluated.
+Thus, if even when file does not exist, or is a directory rather than a file,
+the following call MUST succeed, returning a `CompletableFuture` to be
evaluated.
```java
Path p = new Path("file://tmp/file-which-does-not-exist");
CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
.openFile(p)
- .build;
+ .build();
+```
+
+The inability to access/read a file MUST raise an `IOException`or subclass
+in either the future's `get()` call, or, for late binding operations,
+when an operation to read data is invoked.
+
+Therefore the following sequence SHALL fail when invoked on the
+`future` returned by the previous example.
+
+```java
+ future.get().read();
+```
+
+Access permission checks have the same visibility requirements: permission
failures
+MUST be delayed until the `get()` call and MAY be delayed into subsequent
operations.
+
+Note: some operations on the input stream, such as `seek()` may not attempt
any IO
+at all. Such operations MAY NOT raise exceotions when interacting with
+nonexistent/unreadable files.
+
+## <a name="options"></a> Standard `openFile()` options since Hadoop 3.3.3
+
+These are options which `FileSystem` and `FileContext` implementation
+MUST expected to recognise and MAY support by changing the behavior of
+their input streams as a appropriate.
+
+Hadoop 3.3.0 added the `openFile()` API; these standard options were defined in
+a later release. Therefore, although they are "well known", unless confident
that
+the application will only be executed against releases of Hadoop which knows of
+the options -applications SHOULD set the options via `opt()` calls rather than
`must()`.
+
+When opening a file through the `openFile()` builder API, callers MAY use
+both `.opt(key, value)` and `.must(key, value)` calls to set standard and
+filesystem-specific options.
+
+If set as an `opt()` parameter, unsupported "standard" options MUST be ignored,
+as MUST unrecognized standard options.
+
+If set as a `must()` parameter, unsupported "standard" options MUST be ignored.
+unrecognized standard options MUST be rejected.
+
+The standard `openFile()` options are defined
+in `org.apache.hadoop.fs.OpenFileOptions`; they all SHALL start
+with `fs.option.openfile.`.
+
+Note that while all `FileSystem`/`FileContext` instances SHALL support these
+options to the extent that `must()` declarations SHALL NOT fail, the
+implementations MAY support them to the extent of interpreting the values. This
+means that it is not a requirement for the stores to actually read the the read
+policy or file length values and use them when opening files.
+
+Unless otherwise stated, they SHOULD be viewed as hints.
+
+Note: if a standard option is added such that if set but not
+supported would be an error, then implementations SHALL reject it. For example,
+the S3A filesystem client supports the ability to push down SQL commands. If
+something like that were ever standardized, then the use of the option, either
+in `opt()` or `must()` argument MUST be rejected for filesystems which don't
+support the feature.
+
+### <a name="buffer.size"></a> Option: `fs.option.openfile.buffer.size`
+
+Read buffer size in bytes.
+
+This overrides the default value set in the configuration with the option
+`io.file.buffer.size`.
+
+It is supported by all filesystem clients which allow for stream-specific
buffer
+sizes to be set via `FileSystem.open(path, buffersize)`.
+
+### <a name="read.policy"></a> Option: `fs.option.openfile.read.policy`
+
+Declare the read policy of the input stream. This is a hint as to what the
+expected read pattern of an input stream will be. This MAY control readahead,
+buffering and other optimizations.
+
+Sequential reads may be optimized with prefetching data and/or reading data in
+larger blocks. Some applications (e.g. distCp) perform sequential IO even over
+columnar data.
+
+In contrast, random IO reads data in different parts of the file using a
+sequence of `seek()/read()`
+or via the `PositionedReadable` or `ByteBufferPositionedReadable` APIs.
+
+Random IO performance may be best if little/no prefetching takes place, along
+with other possible optimizations
+
+Queries over columnar formats such as Apache ORC and Apache Parquet perform
such
+random IO; other data formats may be best read with sequential or whole-file
+policies.
+
+What is key is that optimizing reads for seqential reads may impair random
+performance -and vice versa.
+
+1. The seek policy is a hint; even if declared as a `must()` option, the
+ filesystem MAY ignore it.
+1. The interpretation/implementation of a policy is a filesystem specific
+ behavior -and it may change with Hadoop releases and/or specific storage
+ subsystems.
+1. If a policy is not recognized, the filesystem client MUST ignore it.
+
+| Policy | Meaning |
+|--------------|----------------------------------------------------------|
+| `adaptive` | Any adaptive policy implemented by the store. |
+| `default` | The default policy for this store. Generally "adaptive". |
+| `random` | Optimize for random acdess. |
+| `sequential` | Optimize for sequential access. |
+| `vector` | The Vectored IO API is intended to be used. |
+| `whole-file` | The whole file will be read. |
+
+Choosing the wrong read policy for an input source may be inefficient.
+
+A list of read policies MAY be supplied; the first one recognized/supported by
+the filesystem SHALL be the one used. This allows for custom policies to be
+supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
+
+The S3A and ABFS input streams both implement
+the [IOStatisticsSource](iostatistics.html) API, and can be queried for their
IO
+Performance.
+
+*Tip:* log the `toString()` value of input streams at `DEBUG`. The S3A and ABFS
+Input Streams log read statistics, which can provide insight about whether
reads
+are being performed efficiently or not.
+
+_Futher reading_
+
+* [Linux fadvise()](https://linux.die.net/man/2/fadvise).
+* [Windows
`CreateFile()`](https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#caching-behavior)
+
+#### <a name="read.policy."></a> Read Policy `adaptive`
+
+Try to adapt the seek policy to the read pattern of the application.
+
+The `normal` policy of the S3A client and the sole policy supported by
+the `wasb:` client are both adaptive -they assume sequential IO, but once a
+backwards seek/positioned read call is made the stream switches to random IO.
+
+Other filesystem implementations may wish to adopt similar strategies, and/or
+extend the algorithms to detect forward seeks and/or switch from random to
+sequential IO if that is considered more efficient.
+
+Adaptive read policies are the absence of the ability to
+declare the seek policy in the `open()` API, so requiring it to be declared, if
+configurable, in the cluster/application configuration. However, the switch
from
+sequential to random seek policies may be exensive.
+
+When applications explicitly set the `fs.option.openfile.read.policy` option,
if
+they know their read plan, they SHOULD declare which policy is most
appropriate.
+
+#### <a name="read.policy.default"></a> Read Policy ``
+
+The default policy for the filesystem instance.
+Implementation/installation-specific.
+
+#### <a name="read.policy.sequential"></a> Read Policy `sequential`
+
+Expect sequential reads from the first byte read to the end of the file/until
+the stream is closed.
+
+#### <a name="read.policy.random"></a> Read Policy `random`
+
+Expect `seek()/read()` sequences, or use of `PositionedReadable`
+or `ByteBufferPositionedReadable` APIs.
+
+
+#### <a name="read.policy.vector"></a> Read Policy `vector`
+
+This declares that the caller intends to use the Vectored read API of
+[HADOOP-11867](https://issues.apache.org/jira/browse/HADOOP-11867)
+_Add a high-performance vectored read API_.
+
+This is a hint: it is not a requirement when using the API.
+It does inform the implemenations that the stream should be
+configured for optimal vectored IO performance, if such a
+feature has been implemented.
+
+It is *not* exclusive: the same stream may still be used for
+classic `InputStream` and `PositionedRead` API calls.
+Implementations SHOULD use the `random` read policy
+with these operations.
+
+#### <a name="read.policy.whole-file"></a> Read Policy `whole-file`
+
+
+This declares that the whole file is to be read end-to-end; the file system
client is free to enable
+whatever strategies maximise performance for this. In particular, larger
ranged reads/GETs can
+deliver high bandwidth by reducing socket/TLS setup costs and providing a
connection long-lived
+enough for TCP flow control to determine the optimal download rate.
+
+Strategies can include:
+
+* Initiate an HTTP GET of the entire file in `openFile()` operation.
+* Prefech data in large blocks, possibly in parallel read operations.
+
+Applications which know that the entire file is to be read from an opened
stream SHOULD declare this
+read policy.
+
+### <a name="openfile.length"></a> Option: `fs.option.openfile.length`
+
+Declare the length of a file.
+
+This can be used by clients to skip querying a remote store for the size
+of/existence of a file when opening it, similar to declaring a file status
+through the `withFileStatus()` option.
+
+If supported by a filesystem connector, this option MUST be interpreted as
+declaring the minimum length of the file:
+
+1. If the value is negative, the option SHALL be considered unset.
+2. It SHALL NOT be an error if the actual length of the file is greater than
+ this value.
+3. `read()`, `seek()` and positioned read calls MAY use a position
across/beyond
+ this length but below the actual length of the file. Implementations MAY
+ raise `EOFExceptions` in such cases, or they MAY return data.
+
+If this option is used by the FileSystem implementation
+
+*Implementor's Notes*
+
+* A value of `fs.option.openfile.length` < 0 MUST be rejected.
+* If a file status is supplied along with a value in `fs.opt.openfile.length`;
+ the file status values take precedence.
+
+### <a name="split.start"></a> Options: `fs.option.openfile.` and
`fs.option.openfile.split.end`
Review Comment:
missing `split.start`?
```suggestion
### <a name="split.start"></a> Options: `fs.option.openfile.split.start` and
`fs.option.openfile.split.end`
```
Issue Time Tracking
-------------------
Worklog Id: (was: 756378)
Time Spent: 19h 20m (was: 19h 10m)
> Enhance openFile() for better read performance against object stores
> ---------------------------------------------------------------------
>
> Key: HADOOP-16202
> URL: https://issues.apache.org/jira/browse/HADOOP-16202
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs, fs/s3, tools/distcp
> Affects Versions: 3.3.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
> Time Spent: 19h 20m
> Remaining Estimate: 0h
>
> The {{openFile()}} builder API lets us add new options when reading a file
> Add an option {{"fs.s3a.open.option.length"}} which takes a long and allows
> the length of the file to be declared. If set, *no check for the existence of
> the file is issued when opening the file*
> Also: withFileStatus() to take any FileStatus implementation, rather than
> only S3AFileStatus -and not check that the path matches the path being
> opened. Needed to support viewFS-style wrapping and mounting.
> and Adopt where appropriate to stop clusters with S3A reads switched to
> random IO from killing download/localization
> * fs shell copyToLocal
> * distcp
> * IOUtils.copy
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]