This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c0bbc488091cf941ec9e8dfcf589bda5e7592cd
Author: Seth Wiesman <[email protected]>
AuthorDate: Wed May 1 11:45:57 2019 -0500

    [hotfix][docs] Some fixes to FileSystem Documentation
    
    This closes #8326
---
 docs/ops/deployment/aws.md        | 317 -------------------------------
 docs/ops/deployment/aws.zh.md     | 387 --------------------------------------
 docs/ops/filesystems/common.md    |  27 +--
 docs/ops/filesystems/common.zh.md |  27 +--
 docs/ops/filesystems/index.md     |  36 ++--
 docs/ops/filesystems/index.zh.md  | 130 +++----------
 docs/ops/filesystems/oss.md       | 178 ++----------------
 docs/ops/filesystems/oss.zh.md    | 179 ++----------------
 docs/ops/filesystems/s3.md        |  39 ++--
 docs/ops/filesystems/s3.zh.md     |  39 ++--
 10 files changed, 131 insertions(+), 1228 deletions(-)

diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md
index a639e9c..b465340 100644
--- a/docs/ops/deployment/aws.md
+++ b/docs/ops/deployment/aws.md
@@ -63,320 +63,3 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m 
yarn-cluster -yn 1 examples/
 {% endhighlight %}
 
 {% top %}
-
-### Hadoop-provided S3 file systems
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-Apache Flink provides native [S3 FileSystem's](../filesystems/s3.html) out of 
the box and we recomend using them unless required otherwise, e.g. for using S3 
as YARN's resource storage dir
-via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.
-
-#### Set S3 FileSystem
-
-Interaction with S3 happens via one of [Hadoop's S3 FileSystem 
clients](https://wiki.apache.org/hadoop/AmazonS3):
-
-1. `S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for 
reading and writing regular files using Amazon's SDK internally. No maximum 
file size and works with IAM roles.
-2. `NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading 
and writing regular files. Maximum object size is 5GB and does not work with 
IAM roles.
-
-##### `S3AFileSystem` (Recommended)
-
-This is the recommended S3 FileSystem implementation to use. It uses Amazon's 
SDK internally and works with IAM roles (see [Configure Access 
Credentials](#configure-access-credentials-1)).
-
-You need to point Flink to a valid Hadoop configuration, which contains the 
following properties in `core-site.xml`:
-
-{% highlight xml %}
-<configuration>
-
-<property>
-  <name>fs.s3.impl</name>
-  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-</property>
-
-<!-- Comma separated list of local directories used to buffer
-     large results prior to transmitting them to S3. -->
-<property>
-  <name>fs.s3a.buffer.dir</name>
-  <value>/tmp</value>
-</property>
-
-</configuration>
-{% endhighlight %}
-
-This registers `S3AFileSystem` as the default FileSystem for URIs with the 
`s3a://` scheme.
-
-##### `NativeS3FileSystem`
-
-This file system is limited to files up to 5GB in size and it does not work 
with IAM roles (see [Configure Access 
Credentials](#configure-access-credentials-1)), meaning that you have to 
manually configure your AWS credentials in the Hadoop config file.
-
-You need to point Flink to a valid Hadoop configuration, which contains the 
following property in `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3.impl</name>
-  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
-</property>
-{% endhighlight %}
-
-This registers `NativeS3FileSystem` as the default FileSystem for URIs with 
the `s3://` scheme.
-
-{% top %}
-
-#### Hadoop Configuration
-
-You can specify the [Hadoop configuration](../config.html#hdfs) in various 
ways pointing Flink to
-the path of the Hadoop configuration directory, for example
-- by setting the environment variable `HADOOP_CONF_DIR`, or
-- by setting the `fs.hdfs.hadoopconf` configuration option in 
`flink-conf.yaml`:
-{% highlight yaml %}
-fs.hdfs.hadoopconf: /path/to/etc/hadoop
-{% endhighlight %}
-
-This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with 
Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the 
specified directory.
-
-{% top %}
-
-#### Configure Access Credentials
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-After setting up the S3 FileSystem, you need to make sure that Flink is 
allowed to access your S3 buckets.
-
-##### Identity and Access Management (IAM) (Recommended)
-
-When using `S3AFileSystem`, the recommended way of setting up credentials on 
AWS is via [Identity and Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM Roles](http://docs.aws.amazon.com/ [...]
-
-If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
-
-Note that this only works with `S3AFileSystem` and not `NativeS3FileSystem`.
-
-{% top %}
-
-##### Access Keys with `S3AFileSystem` (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. Please 
note that this is discouraged since the [introduction of IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and 
`fs.s3a.secret.key`  in Hadoop's  `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3a.access.key</name>
-  <value></value>
-</property>
-
-<property>
-  <name>fs.s3a.secret.key</name>
-  <value></value>
-</property>
-{% endhighlight %}
-
-{% top %}
-
-##### Access Keys with `NativeS3FileSystem` (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. But this 
is discouraged and you should use `S3AFileSystem` [with the required IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and 
`fs.s3.awsSecretAccessKey`  in Hadoop's  `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3.awsAccessKeyId</name>
-  <value></value>
-</property>
-
-<property>
-  <name>fs.s3.awsSecretAccessKey</name>
-  <value></value>
-</property>
-{% endhighlight %}
-
-{% top %}
-
-#### Provide S3 FileSystem Dependency
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws` artifact 
(Hadoop version 2.6 and later). This JAR and all its dependencies need to be 
added to Flink's classpath, i.e. the class path of both Job and TaskManagers. 
Depending on which FileSystem implementation and which Flink and Hadoop version 
you use, you need to provide different dependencies (see below).
-
-There are multiple ways of adding JARs to Flink's class path, the easiest 
being simply to drop the JARs in Flink's `lib` folder. You need to copy the 
`hadoop-aws` JAR with all its dependencies. You can also export the directory 
containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on 
all machines.
-
-##### Flink for Hadoop 2.7
-
-Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
`hadoop-2.7/share/hadoop/tools/lib`:
-
-- `S3AFileSystem`:
-  - `hadoop-aws-2.7.3.jar`
-  - `aws-java-sdk-s3-1.11.183.jar` and its dependencies:
-    - `aws-java-sdk-core-1.11.183.jar`
-    - `aws-java-sdk-kms-1.11.183.jar`
-    - `jackson-annotations-2.6.7.jar`
-    - `jackson-core-2.6.7.jar`
-    - `jackson-databind-2.6.7.jar`
-    - `joda-time-2.8.1.jar`
-    - `httpcore-4.4.4.jar`
-    - `httpclient-4.5.3.jar`
-
-- `NativeS3FileSystem`:
-  - `hadoop-aws-2.7.3.jar`
-  - `guava-11.0.2.jar`
-
-Note that `hadoop-common` is available as part of Flink, but Guava is shaded 
by Flink.
-
-##### Flink for Hadoop 2.6
-
-Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
`hadoop-2.6/share/hadoop/tools/lib`:
-
-- `S3AFileSystem`:
-  - `hadoop-aws-2.6.4.jar`
-  - `aws-java-sdk-1.7.4.jar` and its dependencies:
-    - `jackson-annotations-2.1.1.jar`
-    - `jackson-core-2.1.1.jar`
-    - `jackson-databind-2.1.1.jar`
-    - `joda-time-2.2.jar`
-    - `httpcore-4.2.5.jar`
-    - `httpclient-4.2.5.jar`
-
-- `NativeS3FileSystem`:
-  - `hadoop-aws-2.6.4.jar`
-  - `guava-11.0.2.jar`
-
-Note that `hadoop-common` is available as part of Flink, but Guava is shaded 
by Flink.
-
-##### Flink for Hadoop 2.4 and earlier
-
-These Hadoop versions only have support for `NativeS3FileSystem`. This comes 
pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don't need 
to add anything to the classpath.
-
-{% top %}
-
-## Common Issues
-
-The following sections lists common issues when working with Flink on AWS.
-
-### Missing S3 FileSystem Configuration
-
-If your job submission fails with an Exception message noting that `No file 
system found with scheme s3` this means that no FileSystem has been configured 
for S3. Please check out the configuration sections for our [shaded 
Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic 
Hadoop](#set-s3-filesystem) file systems for details on how to configure this 
properly.
-
-{% highlight plain %}
-org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed:
-  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) 
[...]
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error:
-  No file system found with scheme s3, referenced in file URI 
's3://<bucket>/<endpoint>'. [...]
-Caused by: java.io.IOException: No file system found with scheme s3,
-  referenced in file URI 's3://<bucket>/<endpoint>'.
-    at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
-    at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
-    at 
o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
-    at 
o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
-    at 
o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
-{% endhighlight %}
-
-{% top %}
-
-### AWS Access Key ID and Secret Access Key Not Specified
-
-If you see your job failing with an Exception noting that the `AWS Access Key 
ID and Secret Access Key must be specified as the username or password`, your 
access credentials have not been set up properly. Please refer to the access 
credential section for our [shaded 
Hadoop/Presto](#configure-access-credentials) or [generic 
Hadoop](#configure-access-credentials-1) file systems for details on how to 
configure this.
-
-{% highlight plain %}
-org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed:
-  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) 
[...]
-Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) 
points to the
-  HDFS NameNode at <bucket>, but the File System could not be initialized with 
that address:
-  AWS Access Key ID and Secret Access Key must be specified as the username or 
password
-  (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
-  or fs.s3n.awsSecretAccessKey properties (respectively) [...]
-Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must
-  be specified as the username or password (respectively) of a s3 URL, or by 
setting
-  the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties 
(respectively) [...]
-    at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
-    at 
o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
-    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
-    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-    at java.lang.reflect.Method.invoke(Method.java:606)
-    at 
o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
-    at 
o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
-    at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
-    at 
o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
-    at 
o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
-{% endhighlight %}
-
-{% top %}
-
-### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found
-
-If you see this Exception, the S3 FileSystem is not part of the class path of 
Flink. Please refer to [S3 FileSystem dependency 
section](#provide-s3-filesystem-dependency) for details on how to configure 
this properly.
-
-{% highlight plain %}
-Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
-  at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
-  at 
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
-  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
-  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
-  at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
-  at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
-  at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
-  ... 25 more
-Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
-  ... 32 more
-Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
-  ... 33 more
-{% endhighlight %}
-
-{% top %}
-
-### IOException: `400: Bad Request`
-
-If you have configured everything properly, but get a `Bad Request` Exception 
**and** your S3 bucket is located in region `eu-central-1`, you might be 
running an S3 client, which does not support [Amazon's signature version 
4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html).
-
-{% highlight plain %}
-[...]
-Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 
: Bad Request [...]
-Caused by: org.jets3t.service.impl.rest.HttpException [...]
-{% endhighlight %}
-or
-{% highlight plain %}
-com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS 
Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error 
Message: Bad Request, S3 Extended Request ID: [...]
-
-{% endhighlight %}
-
-This should not apply to our shaded Hadoop/Presto S3 file systems but can 
occur for Hadoop-provided
-S3 file systems. In particular, all Hadoop versions up to 2.7.2 running 
`NativeS3FileSystem` (which
-depend on `JetS3t 0.9.0` instead of a version [>= 
0.9.4](http://www.jets3t.org/RELEASE_NOTES.html))
-are affected but users also reported this happening with the `S3AFileSystem`.
-
-Except for changing the bucket region, you may also be able to solve this by
-[requesting signature version 4 for request 
authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version),
-e.g. by adding this to Flink's JVM options in `flink-conf.yaml` (see
-[configuration](../config.html#common-options)):
-{% highlight yaml %}
-env.java.opts: -Dcom.amazonaws.services.s3.enableV4
-{% endhighlight %}
-
-{% top %}
-
-### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator
-
-This Exception is usually caused by skipping the local buffer directory 
configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the 
[S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to 
configure the `S3AFileSystem` properly.
-
-{% highlight plain %}
-[...]
-Caused by: java.lang.NullPointerException at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
 at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
 at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
 at
-o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
-o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
-o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
-o.a.h.fs.FileSystem.create(FileSystem.java:907) at
-o.a.h.fs.FileSystem.create(FileSystem.java:888) at
-o.a.h.fs.FileSystem.create(FileSystem.java:785) at
-o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
-o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
-... 25 more
-{% endhighlight %}
-
-{% top %}
diff --git a/docs/ops/deployment/aws.zh.md b/docs/ops/deployment/aws.zh.md
index ae43c40..b465340 100644
--- a/docs/ops/deployment/aws.zh.md
+++ b/docs/ops/deployment/aws.zh.md
@@ -63,390 +63,3 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m 
yarn-cluster -yn 1 examples/
 {% endhighlight %}
 
 {% top %}
-
-## S3: Simple Storage Service
-
-[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) 
provides cloud object storage for a variety of use cases. You can use S3 with 
Flink for **reading** and **writing data** as well in conjunction with the 
[streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html) 
or even as a YARN object storage.
-
-You can use S3 objects like regular files by specifying paths in the following 
format:
-
-{% highlight plain %}
-s3://<your-bucket>/<endpoint>
-{% endhighlight %}
-
-The endpoint can either be a single file or a directory, for example:
-
-{% highlight java %}
-// Read from S3 bucket
-env.readTextFile("s3://<bucket>/<endpoint>");
-
-// Write to S3 bucket
-stream.writeAsText("s3://<bucket>/<endpoint>");
-
-// Use S3 as FsStatebackend
-env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));
-{% endhighlight %}
-
-Note that these examples are *not* exhaustive and you can use S3 in other 
places as well, including your [high availability 
setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ 
site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); 
everywhere that Flink expects a FileSystem URI.
-
-For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3
-filesystem wrappers which are fairly easy to set up. For some cases, however, 
e.g. for using S3 as
-YARN's resource storage dir, it may be necessary to set up a specific Hadoop 
S3 FileSystem
-implementation. Both ways are described below.
-
-### Shaded Hadoop/Presto S3 file systems (recommended)
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the 
respective JAR file from the
-`opt` directory to the `lib` directory of your Flink distribution before 
starting Flink, e.g.
-
-{% highlight bash %}
-cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/
-{% endhighlight %}
-
-Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
-wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers
-for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can
-use this to use both at the same time.
-
-#### Configure Access Credentials
-
-After setting up the S3 FileSystem wrapper, you need to make sure that Flink 
is allowed to access your S3 buckets.
-
-##### Identity and Access Management (IAM) (Recommended)
-
-The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...]
-
-If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
-
-##### Access Keys (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. Please 
note that this is discouraged since the [introduction of IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-You need to configure both `s3.access-key` and `s3.secret-key`  in Flink's  
`flink-conf.yaml`:
-
-{% highlight yaml %}
-s3.access-key: your-access-key
-s3.secret-key: your-secret-key
-{% endhighlight %}
-
-{% top %}
-
-### Hadoop-provided S3 file systems - manual setup
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-This setup is a bit more complex and we recommend using our shaded 
Hadoop/Presto file systems
-instead (see above) unless required otherwise, e.g. for using S3 as YARN's 
resource storage dir
-via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.
-
-#### Set S3 FileSystem
-
-Interaction with S3 happens via one of [Hadoop's S3 FileSystem 
clients](https://wiki.apache.org/hadoop/AmazonS3):
-
-1. `S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for 
reading and writing regular files using Amazon's SDK internally. No maximum 
file size and works with IAM roles.
-2. `NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading 
and writing regular files. Maximum object size is 5GB and does not work with 
IAM roles.
-
-##### `S3AFileSystem` (Recommended)
-
-This is the recommended S3 FileSystem implementation to use. It uses Amazon's 
SDK internally and works with IAM roles (see [Configure Access 
Credentials](#configure-access-credentials-1)).
-
-You need to point Flink to a valid Hadoop configuration, which contains the 
following properties in `core-site.xml`:
-
-{% highlight xml %}
-<configuration>
-
-<property>
-  <name>fs.s3.impl</name>
-  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-</property>
-
-<!-- Comma separated list of local directories used to buffer
-     large results prior to transmitting them to S3. -->
-<property>
-  <name>fs.s3a.buffer.dir</name>
-  <value>/tmp</value>
-</property>
-
-</configuration>
-{% endhighlight %}
-
-This registers `S3AFileSystem` as the default FileSystem for URIs with the 
`s3a://` scheme.
-
-##### `NativeS3FileSystem`
-
-This file system is limited to files up to 5GB in size and it does not work 
with IAM roles (see [Configure Access 
Credentials](#configure-access-credentials-1)), meaning that you have to 
manually configure your AWS credentials in the Hadoop config file.
-
-You need to point Flink to a valid Hadoop configuration, which contains the 
following property in `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3.impl</name>
-  <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
-</property>
-{% endhighlight %}
-
-This registers `NativeS3FileSystem` as the default FileSystem for URIs with 
the `s3://` scheme.
-
-{% top %}
-
-#### Hadoop Configuration
-
-You can specify the [Hadoop configuration](../config.html#hdfs) in various 
ways pointing Flink to
-the path of the Hadoop configuration directory, for example
-- by setting the environment variable `HADOOP_CONF_DIR`, or
-- by setting the `fs.hdfs.hadoopconf` configuration option in 
`flink-conf.yaml`:
-{% highlight yaml %}
-fs.hdfs.hadoopconf: /path/to/etc/hadoop
-{% endhighlight %}
-
-This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with 
Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the 
specified directory.
-
-{% top %}
-
-#### Configure Access Credentials
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-After setting up the S3 FileSystem, you need to make sure that Flink is 
allowed to access your S3 buckets.
-
-##### Identity and Access Management (IAM) (Recommended)
-
-When using `S3AFileSystem`, the recommended way of setting up credentials on 
AWS is via [Identity and Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM Roles](http://docs.aws.amazon.com/ [...]
-
-If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
-
-Note that this only works with `S3AFileSystem` and not `NativeS3FileSystem`.
-
-{% top %}
-
-##### Access Keys with `S3AFileSystem` (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. Please 
note that this is discouraged since the [introduction of IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and 
`fs.s3a.secret.key`  in Hadoop's  `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3a.access.key</name>
-  <value></value>
-</property>
-
-<property>
-  <name>fs.s3a.secret.key</name>
-  <value></value>
-</property>
-{% endhighlight %}
-
-{% top %}
-
-##### Access Keys with `NativeS3FileSystem` (Discouraged)
-
-Access to S3 can be granted via your **access and secret key pair**. But this 
is discouraged and you should use `S3AFileSystem` [with the required IAM 
roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2).
-
-For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and 
`fs.s3.awsSecretAccessKey`  in Hadoop's  `core-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>fs.s3.awsAccessKeyId</name>
-  <value></value>
-</property>
-
-<property>
-  <name>fs.s3.awsSecretAccessKey</name>
-  <value></value>
-</property>
-{% endhighlight %}
-
-{% top %}
-
-#### Provide S3 FileSystem Dependency
-
-{% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](#emr-elastic-mapreduce). %}
-
-Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws` artifact 
(Hadoop version 2.6 and later). This JAR and all its dependencies need to be 
added to Flink's classpath, i.e. the class path of both Job and TaskManagers. 
Depending on which FileSystem implementation and which Flink and Hadoop version 
you use, you need to provide different dependencies (see below).
-
-There are multiple ways of adding JARs to Flink's class path, the easiest 
being simply to drop the JARs in Flink's `lib` folder. You need to copy the 
`hadoop-aws` JAR with all its dependencies. You can also export the directory 
containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on 
all machines.
-
-##### Flink for Hadoop 2.7
-
-Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
`hadoop-2.7/share/hadoop/tools/lib`:
-
-- `S3AFileSystem`:
-  - `hadoop-aws-2.7.3.jar`
-  - `aws-java-sdk-s3-1.11.183.jar` and its dependencies:
-    - `aws-java-sdk-core-1.11.183.jar`
-    - `aws-java-sdk-kms-1.11.183.jar`
-    - `jackson-annotations-2.6.7.jar`
-    - `jackson-core-2.6.7.jar`
-    - `jackson-databind-2.6.7.jar`
-    - `joda-time-2.8.1.jar`
-    - `httpcore-4.4.4.jar`
-    - `httpclient-4.5.3.jar`
-
-- `NativeS3FileSystem`:
-  - `hadoop-aws-2.7.3.jar`
-  - `guava-11.0.2.jar`
-
-Note that `hadoop-common` is available as part of Flink, but Guava is shaded 
by Flink.
-
-##### Flink for Hadoop 2.6
-
-Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
`hadoop-2.6/share/hadoop/tools/lib`:
-
-- `S3AFileSystem`:
-  - `hadoop-aws-2.6.4.jar`
-  - `aws-java-sdk-1.7.4.jar` and its dependencies:
-    - `jackson-annotations-2.1.1.jar`
-    - `jackson-core-2.1.1.jar`
-    - `jackson-databind-2.1.1.jar`
-    - `joda-time-2.2.jar`
-    - `httpcore-4.2.5.jar`
-    - `httpclient-4.2.5.jar`
-
-- `NativeS3FileSystem`:
-  - `hadoop-aws-2.6.4.jar`
-  - `guava-11.0.2.jar`
-
-Note that `hadoop-common` is available as part of Flink, but Guava is shaded 
by Flink.
-
-##### Flink for Hadoop 2.4 and earlier
-
-These Hadoop versions only have support for `NativeS3FileSystem`. This comes 
pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don't need 
to add anything to the classpath.
-
-{% top %}
-
-## Common Issues
-
-The following sections lists common issues when working with Flink on AWS.
-
-### Missing S3 FileSystem Configuration
-
-If your job submission fails with an Exception message noting that `No file 
system found with scheme s3` this means that no FileSystem has been configured 
for S3. Please check out the configuration sections for our [shaded 
Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic 
Hadoop](#set-s3-filesystem) file systems for details on how to configure this 
properly.
-
-{% highlight plain %}
-org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed:
-  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) 
[...]
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error:
-  No file system found with scheme s3, referenced in file URI 
's3://<bucket>/<endpoint>'. [...]
-Caused by: java.io.IOException: No file system found with scheme s3,
-  referenced in file URI 's3://<bucket>/<endpoint>'.
-    at o.a.f.core.fs.FileSystem.get(FileSystem.java:296)
-    at o.a.f.core.fs.Path.getFileSystem(Path.java:311)
-    at 
o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
-    at 
o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
-    at 
o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
-{% endhighlight %}
-
-{% top %}
-
-### AWS Access Key ID and Secret Access Key Not Specified
-
-If you see your job failing with an Exception noting that the `AWS Access Key 
ID and Secret Access Key must be specified as the username or password`, your 
access credentials have not been set up properly. Please refer to the access 
credential section for our [shaded 
Hadoop/Presto](#configure-access-credentials) or [generic 
Hadoop](#configure-access-credentials-1) file systems for details on how to 
configure this.
-
-{% highlight plain %}
-org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed:
-  Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) 
[...]
-Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) 
points to the
-  HDFS NameNode at <bucket>, but the File System could not be initialized with 
that address:
-  AWS Access Key ID and Secret Access Key must be specified as the username or 
password
-  (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId
-  or fs.s3n.awsSecretAccessKey properties (respectively) [...]
-Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must
-  be specified as the username or password (respectively) of a s3 URL, or by 
setting
-  the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties 
(respectively) [...]
-    at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
-    at 
o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
-    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
-    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-    at java.lang.reflect.Method.invoke(Method.java:606)
-    at 
o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
-    at 
o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
-    at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
-    at 
o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
-    at 
o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
-{% endhighlight %}
-
-{% top %}
-
-### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found
-
-If you see this Exception, the S3 FileSystem is not part of the class path of 
Flink. Please refer to [S3 FileSystem dependency 
section](#provide-s3-filesystem-dependency) for details on how to configure 
this properly.
-
-{% highlight plain %}
-Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
-  at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
-  at 
org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
-  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
-  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
-  at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
-  at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
-  at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
-  ... 25 more
-Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154)
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178)
-  ... 32 more
-Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
-  at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
-  at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
-  ... 33 more
-{% endhighlight %}
-
-{% top %}
-
-### IOException: `400: Bad Request`
-
-If you have configured everything properly, but get a `Bad Request` Exception 
**and** your S3 bucket is located in region `eu-central-1`, you might be 
running an S3 client, which does not support [Amazon's signature version 
4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html).
-
-{% highlight plain %}
-[...]
-Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 
: Bad Request [...]
-Caused by: org.jets3t.service.impl.rest.HttpException [...]
-{% endhighlight %}
-or
-{% highlight plain %}
-com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS 
Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error 
Message: Bad Request, S3 Extended Request ID: [...]
-
-{% endhighlight %}
-
-This should not apply to our shaded Hadoop/Presto S3 file systems but can 
occur for Hadoop-provided
-S3 file systems. In particular, all Hadoop versions up to 2.7.2 running 
`NativeS3FileSystem` (which
-depend on `JetS3t 0.9.0` instead of a version [>= 
0.9.4](http://www.jets3t.org/RELEASE_NOTES.html))
-are affected but users also reported this happening with the `S3AFileSystem`.
-
-Except for changing the bucket region, you may also be able to solve this by
-[requesting signature version 4 for request 
authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version),
-e.g. by adding this to Flink's JVM options in `flink-conf.yaml` (see
-[configuration](../config.html#common-options)):
-{% highlight yaml %}
-env.java.opts: -Dcom.amazonaws.services.s3.enableV4
-{% endhighlight %}
-
-{% top %}
-
-### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator
-
-This Exception is usually caused by skipping the local buffer directory 
configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the 
[S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to 
configure the `S3AFileSystem` properly.
-
-{% highlight plain %}
-[...]
-Caused by: java.lang.NullPointerException at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
 at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
 at
-o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
 at
-o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at
-o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at
-o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
-o.a.h.fs.FileSystem.create(FileSystem.java:907) at
-o.a.h.fs.FileSystem.create(FileSystem.java:888) at
-o.a.h.fs.FileSystem.create(FileSystem.java:785) at
-o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
-o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
-... 25 more
-{% endhighlight %}
-
-{% top %}
diff --git a/docs/ops/filesystems/common.md b/docs/ops/filesystems/common.md
index edef1dd..a8a371e 100644
--- a/docs/ops/filesystems/common.md
+++ b/docs/ops/filesystems/common.md
@@ -22,14 +22,14 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink provides a number of common configuration settings that work 
across all file system implementations. 
+Apache Flink provides several standard configuration settings that work across 
all file system implementations. 
 
 * This will be replaced by the TOC
 {:toc}
 
 ## Default File System
 
-If paths to files do not explicitly specify a file system scheme (and 
authority), a default scheme (and authority) will be used.
+A default scheme (and authority) is used if paths to files do not explicitly 
specify a file system scheme (and authority).
 
 {% highlight yaml %}
 fs.default-scheme: <default-fs>
@@ -40,10 +40,10 @@ For example, if the default file system configured as 
`fs.default-scheme: hdfs:/
 
 ## Connection limiting
 
-You can limit the total number of connections that a file system can 
concurrently open. This is useful when the file system cannot handle a large 
number
-of concurrent reads / writes or open connections at the same time.
+You can limit the total number of connections that a file system can 
concurrently open which is useful when the file system cannot handle a large 
number
+of concurrent reads/writes or open connections at the same time.
 
-For example, very small HDFS clusters with few RPC handlers can sometimes be 
overwhelmed by a large Flink job trying to build up many connections during a 
checkpoint.
+For example, small HDFS clusters with few RPC handlers can sometimes be 
overwhelmed by a large Flink job trying to build up many connections during a 
checkpoint.
 
 To limit a specific file system's connections, add the following entries to 
the Flink configuration. The file system to be limited is identified by
 its scheme.
@@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
 fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
 {% endhighlight %}
 
-You can limit the number if input/output connections (streams) separately 
(`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a 
limit on
-the total number of concurrent streams (`fs.<scheme>.limit.total`). If the 
file system tries to open more streams, the operation will block until some 
streams are closed.
-If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, 
the stream opening will fail.
+You can limit the number of input/output connections (streams) separately 
(`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a 
limit on
+the total number of concurrent streams (`fs.<scheme>.limit.total`). If the 
file system tries to open more streams, the operation blocks until some streams 
close.
+If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, 
the stream opening fails.
 
-To prevent inactive streams from taking up the complete pool (preventing new 
connections to be opened), you can add an inactivity timeout for streams:
-`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes 
for at least that amount of time, it is forcibly closed.
+To prevent inactive streams from taking up the full pool (preventing new 
connections to be opened), you can add an inactivity timeout which forcibly 
closes them if they do not read/write any bytes for at least that amount of 
time: `fs.<scheme>.limit.stream-timeout`. 
 
-These limits are enforced per TaskManager, so each TaskManager in a Flink 
application or cluster will open up to that number of connections.
-In addition, the limits are also only enforced per FileSystem instance. 
Because File Systems are created per scheme and authority, different
-authorities will have their own connection pool. For example 
`hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
\ No newline at end of file
+Limit enforcment on a per TaskManager/file system basis.
+Because file systems creation occurs per scheme and authority, different
+authorities have independent connection pools. For example 
`hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
+
+{% top %}
\ No newline at end of file
diff --git a/docs/ops/filesystems/common.zh.md 
b/docs/ops/filesystems/common.zh.md
index edef1dd..a8a371e 100644
--- a/docs/ops/filesystems/common.zh.md
+++ b/docs/ops/filesystems/common.zh.md
@@ -22,14 +22,14 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink provides a number of common configuration settings that work 
across all file system implementations. 
+Apache Flink provides several standard configuration settings that work across 
all file system implementations. 
 
 * This will be replaced by the TOC
 {:toc}
 
 ## Default File System
 
-If paths to files do not explicitly specify a file system scheme (and 
authority), a default scheme (and authority) will be used.
+A default scheme (and authority) is used if paths to files do not explicitly 
specify a file system scheme (and authority).
 
 {% highlight yaml %}
 fs.default-scheme: <default-fs>
@@ -40,10 +40,10 @@ For example, if the default file system configured as 
`fs.default-scheme: hdfs:/
 
 ## Connection limiting
 
-You can limit the total number of connections that a file system can 
concurrently open. This is useful when the file system cannot handle a large 
number
-of concurrent reads / writes or open connections at the same time.
+You can limit the total number of connections that a file system can 
concurrently open which is useful when the file system cannot handle a large 
number
+of concurrent reads/writes or open connections at the same time.
 
-For example, very small HDFS clusters with few RPC handlers can sometimes be 
overwhelmed by a large Flink job trying to build up many connections during a 
checkpoint.
+For example, small HDFS clusters with few RPC handlers can sometimes be 
overwhelmed by a large Flink job trying to build up many connections during a 
checkpoint.
 
 To limit a specific file system's connections, add the following entries to 
the Flink configuration. The file system to be limited is identified by
 its scheme.
@@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
 fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
 {% endhighlight %}
 
-You can limit the number if input/output connections (streams) separately 
(`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a 
limit on
-the total number of concurrent streams (`fs.<scheme>.limit.total`). If the 
file system tries to open more streams, the operation will block until some 
streams are closed.
-If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, 
the stream opening will fail.
+You can limit the number of input/output connections (streams) separately 
(`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a 
limit on
+the total number of concurrent streams (`fs.<scheme>.limit.total`). If the 
file system tries to open more streams, the operation blocks until some streams 
close.
+If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, 
the stream opening fails.
 
-To prevent inactive streams from taking up the complete pool (preventing new 
connections to be opened), you can add an inactivity timeout for streams:
-`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes 
for at least that amount of time, it is forcibly closed.
+To prevent inactive streams from taking up the full pool (preventing new 
connections to be opened), you can add an inactivity timeout which forcibly 
closes them if they do not read/write any bytes for at least that amount of 
time: `fs.<scheme>.limit.stream-timeout`. 
 
-These limits are enforced per TaskManager, so each TaskManager in a Flink 
application or cluster will open up to that number of connections.
-In addition, the limits are also only enforced per FileSystem instance. 
Because File Systems are created per scheme and authority, different
-authorities will have their own connection pool. For example 
`hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
\ No newline at end of file
+Limit enforcment on a per TaskManager/file system basis.
+Because file systems creation occurs per scheme and authority, different
+authorities have independent connection pools. For example 
`hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
+
+{% top %}
\ No newline at end of file
diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md
index be8bbee..b8548c1 100644
--- a/docs/ops/filesystems/index.md
+++ b/docs/ops/filesystems/index.md
@@ -24,40 +24,37 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink uses file system for both ingest and output of data for streaming 
and batch applications as well as targets for checkpoint storage. 
-These file systems can be local such as *Unix*, distributed like *HDFS*, or 
even object stores such as *S3*.
+Apache Flink uses file systems to consume and persistently store data, both 
for the results of applications and for fault tolerance and recovery.
+These are some of most of the popular file systems, including *local*, 
*hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyum OSS*.
 
 The file system used for a particular file is determined by its URI scheme.
 For example, `file:///home/user/text.txt` refers to a file in the local file 
system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a 
specific HDFS cluster.
 
-FileSystem instances are instantiated once per process and then cached / 
pooled, to
-avoid configuration overhead per stream creation and to enforce certain 
constraints, such as connection/stream limits.
+File system instances are instantiated once per process and then 
cached/pooled, to avoid configuration overhead per stream creation and to 
enforce certain constraints, such as connection/stream limits.
 
 * This will be replaced by the TOC
 {:toc}
 
 ### Built-in File Systems
 
-Flink ships with support for most of the popular file systems, including 
*local*, *hadoop-compatible*, *S3*, *MapR FS* and *OpenStack Swift FS*.
-Each is identified by the scheme included in the URI of the provide file path. 
-
 Flink ships with implementations for the following file systems:
 
-  - **local**: This file system is used when the scheme is *"file://"*, and it 
represents the file system of the local machine, including any NFS or SAN that 
is mounted into that local file system.
+  - **local**: This file system is used when the scheme is *"file://"*, and it 
represents the file system of the local machine, including any NFS or SAN 
drives mounted into that local file system.
 
   - **S3**: Flink directly provides file systems to talk to Amazon S3 with two 
alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. 
Both implementations are self-contained with no dependency footprint.
     
   - **MapR FS**: The MapR file system *"maprfs://"* is automatically available 
when the MapR libraries are in the classpath.
   
   - **OpenStack Swift FS**: Flink directly provides a file system to talk to 
the OpenStack Swift file system, registered under the scheme *"swift://"*. 
-  The implementation `flink-swift-fs-hadoop` is based on the [Hadoop 
Project](https://hadoop.apache.org/) but is self-contained with no dependency 
footprint.
+  The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop 
Project](https://hadoop.apache.org/) but is self-contained with no dependency 
footprint.
   To use it when using Flink as a library, add the respective maven dependency 
(`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the 
respective jar file from the `opt` folder to the `lib` folder.
 
 #### HDFS and Hadoop File System support 
 
-For all schemes where Flink cannot find a directly supported file system, it 
will fall back to Hadoop.
-All Hadoop file systems are automatically available when `flink-runtime` and 
the Hadoop libraries are in classpath.
+For all schemes where Flink cannot find a directly supported file system, it 
falls back to Hadoop.
+All Hadoop file systems are automatically available when `flink-runtime` and 
the Hadoop libraries are on the classpath.
+
 
 This way, Flink seamlessly supports all of Hadoop file systems, and all 
Hadoop-compatible file systems (HCFS).
 
@@ -67,12 +64,25 @@ This way, Flink seamlessly supports all of Hadoop file 
systems, and all Hadoop-c
   - **har**
   - ...
 
+##### Hadoop Configuration
+
+We recommend using Flink's built-in file systems them unless required 
otherwise, e.g., for using that file system as YARN's resource storage dir via 
the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.
+
+If using a Hadoop file system, you can specify the 
[configuration](../config.html#hdfs) by setting the environment variable 
`HADOOP_CONF_DIR`, or by setting the `fs.hdfs.hadoopconf` configuration option 
in `flink-conf.yaml`. 
+
+{% highlight yaml %}
+fs.hdfs.hadoopconf: /path/to/etc/hadoop
+{% endhighlight %}
+
+This registers `/path/to/etc/hadoop` as Hadoop's configuration directory and 
is where Flink will look for the `core-site.xml` and `hdfs-site.xml` files.
+
+
 ## Adding new File System Implementations
 
 File systems are represented via the `org.apache.flink.core.fs.FileSystem` 
class, which captures the ways to access and modify files and objects in that 
file system. 
-Implementations are discovered by Flink through Java's service abstraction, 
making it easy to add additional file system implementations.
+Implementations are discovered by Flink through Java's service abstraction, 
making it easy to add new file system implementations.
 
-In order to add a new File System, the following steps are needed:
+To add a new file system:
 
   - Add the File System implementation, which is a subclass of 
`org.apache.flink.core.fs.FileSystem`.
   - Add a factory that instantiates that file system and declares the scheme 
under which the FileSystem is registered. This must be a subclass of 
`org.apache.flink.core.fs.FileSystemFactory`.
diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md
index 9390239..7f2de4e 100644
--- a/docs/ops/filesystems/index.zh.md
+++ b/docs/ops/filesystems/index.zh.md
@@ -24,66 +24,39 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page provides details on setting up and configuring different file 
systems for use with Flink.
-We start by describing how to use and configure the different file systems 
that are supported by Flink
-out-of-the-box, before describing the necessary steps in order to add support 
about other/custom file system
-implementations.
+Apache Flink uses file systems to consume and persistently store data, both 
for the results of applications and for fault tolerance and recovery.
+These are some of most of the popular file systems, including *local*, 
*hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyum OSS*.
 
-## Flink's File System support
+The file system used for a particular file is determined by its URI scheme.
+For example, `file:///home/user/text.txt` refers to a file in the local file 
system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a 
specific HDFS cluster.
 
-Flink uses file systems both as *sources* and *sinks* in streaming/batch 
applications and as a target for *checkpointing*.
-These file systems can for example be *Unix/Windows file systems*, *HDFS*, or 
even object stores like *S3*.
+File system instances are instantiated once per process and then 
cached/pooled, to avoid configuration overhead per stream creation and to 
enforce certain constraints, such as connection/stream limits.
 
-The file system used for a specific file is determined by the file URI's 
scheme. For example `file:///home/user/text.txt` refers to
-a file in the local file system, while 
`hdfs://namenode:50010/data/user/text.txt` refers to a file in a specific HDFS 
cluster.
-
-File systems are represented via the `org.apache.flink.core.fs.FileSystem` 
class, which captures the ways to access and modify
-files and objects in that file system. FileSystem instances are instantiated 
once per process and then cached / pooled, to
-avoid configuration overhead per stream creation and to enforce certain 
constraints, such as connection/stream limits.
+* This will be replaced by the TOC
+{:toc}
 
 ### Built-in File Systems
 
-Flink ships with support for most of the popular file systems, namely *local*, 
*hadoop-compatible*, *S3*, *MapR FS*
-and *OpenStack Swift FS*. Each of these is identified by the scheme included 
in the URI of the provide file path. 
-
 Flink ships with implementations for the following file systems:
 
-  - **local**: This file system is used when the scheme is *"file://"*, and it 
represents the file system of the local machine, 
-including any NFS or SAN that is mounted into that local file system.
-
-  - **S3**: Flink directly provides file systems to talk to Amazon S3. There 
are two alternative implementations, `flink-s3-fs-presto`
-    and `flink-s3-fs-hadoop`. Both implementations are self-contained with no 
dependency footprint. There is no need to add Hadoop to
-    the classpath to use them. Both internally use some Hadoop code, but 
"shade away" all classes to avoid any dependency conflicts.
-
-    - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and 
*"s3p://"*, is based on code from the [Presto project](https://prestodb.io/).
-      You can configure it the same way you can [configure the Presto file 
system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration).
-      
-    - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based 
on code from the [Hadoop Project](https://hadoop.apache.org/).
-      The file system can be [configured exactly like Hadoop's 
s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A).
+  - **local**: This file system is used when the scheme is *"file://"*, and it 
represents the file system of the local machine, including any NFS or SAN 
drives mounted into that local file system.
 
-    To use those file systems when using Flink as a library, add the 
respective maven dependency (`org.apache.flink:flink-s3-fs-presto:{{ 
site.version }}`
-    or `org.apache.flink:flink-s3-fs-hadoop:{{ site.version }}`). When 
starting a Flink application from the Flink binaries, copy or move
-    the respective jar file from the `opt` folder to the `lib` folder. See 
also [AWS setup](deployment/aws.html) for additional details.
-    
-    <span class="label label-danger">Attention</span>: As described above, 
both Hadoop and Presto "listen" to paths with scheme set to *"s3://"*. This is 
-    convenient for switching between implementations (Hadoop or Presto), but 
it can lead to non-determinism when both
-    implementations are required. This can happen when, for example, the job 
uses the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html) 
-    which only supports Hadoop, but uses Presto for checkpointing. In this 
case, it is advised to use explicitly *"s3a://"* 
-    as a scheme for the sink (Hadoop) and *"s3p://"* for checkpointing 
(Presto).
+  - **S3**: Flink directly provides file systems to talk to Amazon S3 with two 
alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. 
Both implementations are self-contained with no dependency footprint.
     
   - **MapR FS**: The MapR file system *"maprfs://"* is automatically available 
when the MapR libraries are in the classpath.
   
   - **OpenStack Swift FS**: Flink directly provides a file system to talk to 
the OpenStack Swift file system, registered under the scheme *"swift://"*. 
-  The implementation `flink-swift-fs-hadoop` is based on the [Hadoop 
Project](https://hadoop.apache.org/) but is self-contained with no dependency 
footprint.
+  The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop 
Project](https://hadoop.apache.org/) but is self-contained with no dependency 
footprint.
   To use it when using Flink as a library, add the respective maven dependency 
(`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
   When starting a Flink application from the Flink binaries, copy or move the 
respective jar file from the `opt` folder to the `lib` folder.
 
 #### HDFS and Hadoop File System support 
 
-For all schemes where it cannot find a directly supported file system, Flink 
will try to use Hadoop to instantiate a file system for the respective scheme.
-All Hadoop file systems are automatically available once `flink-runtime` and 
the Hadoop libraries are in classpath.
+For all schemes where Flink cannot find a directly supported file system, it 
falls back to Hadoop.
+All Hadoop file systems are automatically available when `flink-runtime` and 
the Hadoop libraries are on the classpath.
+
 
-That way, Flink seamlessly supports all of Hadoop file systems, and all 
Hadoop-compatible file systems (HCFS), for example:
+This way, Flink seamlessly supports all of Hadoop file systems, and all 
Hadoop-compatible file systems (HCFS).
 
   - **hdfs**
   - **ftp**
@@ -91,84 +64,25 @@ That way, Flink seamlessly supports all of Hadoop file 
systems, and all Hadoop-c
   - **har**
   - ...
 
+##### Hadoop Configuration
 
-## Common File System configurations
-
-The following configuration settings exist across different file systems.
-
-#### Default File System
-
-If paths to files do not explicitly specify a file system scheme (and 
authority), a default scheme (and authority) will be used.
-
-{% highlight yaml %}
-fs.default-scheme: <default-fs>
-{% endhighlight %}
-
-For example, if the default file system configured as `fs.default-scheme: 
hdfs://localhost:9000/`, then a file path of
-`/user/hugo/in.txt` is interpreted as `hdfs://localhost:9000/user/hugo/in.txt`.
-
-#### Connection limiting
-
-You can limit the total number of connections that a file system can 
concurrently open. This is useful when the file system cannot handle a large 
number
-of concurrent reads / writes or open connections at the same time.
+We recommend using Flink's built-in file systems them unless required 
otherwise, e.g., for using that file system as YARN's resource storage dir via 
the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`.
 
-For example, very small HDFS clusters with few RPC handlers can sometimes be 
overwhelmed by a large Flink job trying to build up many connections during a 
checkpoint.
-
-To limit a specific file system's connections, add the following entries to 
the Flink configuration. The file system to be limited is identified by
-its scheme.
+If using a Hadoop file system, you can specify the 
[configuration](../config.html#hdfs) by setting the environment variable 
`HADOOP_CONF_DIR`, or by setting the `fs.hdfs.hadoopconf` configuration option 
in `flink-conf.yaml`. 
 
 {% highlight yaml %}
-fs.<scheme>.limit.total: (number, 0/-1 mean no limit)
-fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
-fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
-fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
-fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
+fs.hdfs.hadoopconf: /path/to/etc/hadoop
 {% endhighlight %}
 
-You can limit the number if input/output connections (streams) separately 
(`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a 
limit on
-the total number of concurrent streams (`fs.<scheme>.limit.total`). If the 
file system tries to open more streams, the operation will block until some 
streams are closed.
-If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, 
the stream opening will fail.
-
-To prevent inactive streams from taking up the complete pool (preventing new 
connections to be opened), you can add an inactivity timeout for streams:
-`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes 
for at least that amount of time, it is forcibly closed.
-
-These limits are enforced per TaskManager, so each TaskManager in a Flink 
application or cluster will open up to that number of connections.
-In addition, the limits are also only enforced per FileSystem instance. 
Because File Systems are created per scheme and authority, different
-authorities will have their own connection pool. For example 
`hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
-
-## Entropy injection for S3 file systems
-
-The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) 
support entropy injection. Entropy injection is
-a technique to improve scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
-
-If entropy injection is activated, a configured substring in the paths will be 
replaced by random characters. For example, path
-`s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by 
something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`.
-
-**Note that this only happens when the file creation passes the option to 
inject entropy!**, otherwise the file path will
-simply remove the entropy key substring. See
-[FileSystem.create(Path, 
WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
-for details.
-
-*Note: The Flink runtime currently passes the option to inject entropy only to 
checkpoint data files.*
-*All other files, including checkpoint metadata and external URI do not inject 
entropy, to keep checkpoint URIs predictable.*
-
-To enable entropy injection, configure the *entropy key* and the *entropy 
length* parameters.
-
-```
-s3.entropy.key: _entropy_
-s3.entropy.length: 4 (default)
-
-```
+This registers `/path/to/etc/hadoop` as Hadoop's configuration directory and 
is where Flink will look for the `core-site.xml` and `hdfs-site.xml` files.
 
-The `s3.entropy.key` defines the string in paths that is replaced by the 
random characters. Paths that do not contain the entropy key are left unchanged.
-If a file system operation does not pass the *"inject entropy"* write option, 
the entropy key substring is simply removed.
-The `s3.entropy.length` defined the number of random alphanumeric characters 
to replace the entropy key with.
 
 ## Adding new File System Implementations
 
-File system implementations are discovered by Flink through Java's service 
abstraction, making it easy to add additional file system implementations.
+File systems are represented via the `org.apache.flink.core.fs.FileSystem` 
class, which captures the ways to access and modify files and objects in that 
file system. 
+Implementations are discovered by Flink through Java's service abstraction, 
making it easy to add new file system implementations.
 
-In order to add a new File System, the following steps are needed:
+To add a new file system:
 
   - Add the File System implementation, which is a subclass of 
`org.apache.flink.core.fs.FileSystem`.
   - Add a factory that instantiates that file system and declares the scheme 
under which the FileSystem is registered. This must be a subclass of 
`org.apache.flink.core.fs.FileSystemFactory`.
diff --git a/docs/ops/filesystems/oss.md b/docs/ops/filesystems/oss.md
index 8711e6f..0c98c43 100644
--- a/docs/ops/filesystems/oss.md
+++ b/docs/ops/filesystems/oss.md
@@ -23,16 +23,15 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-* This will be replaced by the TOC
-{:toc}
-
 ## OSS: Object Storage Service
 
-[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun 
OSS) is widely used especially among China’s cloud users, and it provides cloud 
object storage for a variety of use cases.
+[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun 
OSS) is widely used, particularly popular among China’s cloud users, and it 
provides cloud object storage for a variety of use cases.
+You can use OSS with Flink for **reading** and **writing data** as well in 
conjunction with the [streaming **state backends**]({{ site.baseurl 
}}/ops/state/state_backends.html)
 
-[Hadoop file 
system](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html)
 supports OSS since version 2.9.1. Now, you can also use OSS with Fink for 
**reading** and **writing data**.
+* This will be replaced by the TOC
+{:toc}
 
-You can access OSS objects like this:
+You can use OSS objects like regular files by specifying paths in the 
following format:
 
 {% highlight plain %}
 oss://<your-bucket>/<object-name>
@@ -45,15 +44,15 @@ Below shows how to use OSS with Flink:
 env.readTextFile("oss://<your-bucket>/<object-name>");
 
 // Write to OSS bucket
-dataSet.writeAsText("oss://<your-bucket>/<object-name>")
+stream.writeAsText("oss://<your-bucket>/<object-name>")
 
+// Use OSS as FsStatebackend
+env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"));
 {% endhighlight %}
 
-There are two ways to use OSS with Flink, our shaded `flink-oss-fs-hadoop` 
will cover most scenarios. However, you may need to set up a specific Hadoop 
OSS FileSystem implementation if you want use OSS as YARN's resource storage 
dir ([This patch](https://issues.apache.org/jira/browse/HADOOP-15919) enables 
YARN to use OSS). Both ways are described below.
-
-### Shaded Hadoop OSS file system (recommended)
+### Shaded Hadoop OSS file system 
 
-In order to use `flink-oss-fs-hadoop`, copy the respective JAR file from the 
opt directory to the lib directory of your Flink distribution before starting 
Flink, e.g.
+To use `flink-oss-fs-hadoop,` copy the respective JAR file from the opt 
directory to the lib directory of your Flink distribution before starting 
Flink, e.g.
 
 {% highlight bash %}
 cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/
@@ -64,7 +63,7 @@ cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/
 #### Configurations setup
 After setting up the OSS FileSystem wrapper, you need to add some 
configurations to make sure that Flink is allowed to access your OSS buckets.
 
-In order to use OSS with Flink more easily, you can use the same configuration 
keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+To allow for easy adoption, you can use the same configuration keys in 
`flink-conf.yaml` as in Hadoop's `core-site.xml`
 
 You can see the configuration keys in the [Hadoop OSS 
documentation](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html).
 
@@ -76,157 +75,4 @@ fs.oss.accessKeyId: Aliyun access key ID
 fs.oss.accessKeySecret: Aliyun access key secret
 {% endhighlight %}
 
-### Hadoop-provided OSS file system - manual setup
-This setup is a bit more complex and we recommend using our shaded Hadoop file 
systems instead (see above) unless required otherwise, e.g. for using OSS as 
YARN’s resource storage dir via the fs.defaultFS configuration property in 
Hadoop’s core-site.xml.
-
-#### Set OSS FileSystem
-You need to point Flink to a valid Hadoop configuration, which contains the 
following properties in core-site.xml:
-
-{% highlight xml %}
-<configuration>
-
-<property>
-    <name>fs.oss.impl</name>
-    <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
-  </property>
-
-  <property>
-    <name>fs.oss.endpoint</name>
-    <value>Aliyun OSS endpoint to connect to</value>
-    <description>Aliyun OSS endpoint to connect to. An up-to-date list is 
provided in the Aliyun OSS Documentation.</description>
-  </property>
-
-  <property>
-    <name>fs.oss.accessKeyId</name>
-    <description>Aliyun access key ID</description>
-  </property>
-
-  <property>
-    <name>fs.oss.accessKeySecret</name>
-    <description>Aliyun access key secret</description>
-  </property>
-
-  <property>
-    <name>fs.oss.buffer.dir</name>
-    <value>/tmp/oss</value>
-  </property>
-
-</property>
-
-</configuration>
-{% endhighlight %}
-
-#### Hadoop Configuration
-
-You can specify the [Hadoop configuration](../config.html#hdfs) in various 
ways pointing Flink to
-the path of the Hadoop configuration directory, for example
-- by setting the environment variable `HADOOP_CONF_DIR`, or
-- by setting the `fs.hdfs.hadoopconf` configuration option in 
`flink-conf.yaml`:
-{% highlight yaml %}
-fs.hdfs.hadoopconf: /path/to/etc/hadoop
-{% endhighlight %}
-
-This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with 
Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the 
specified directory.
-
-#### Provide OSS FileSystem Dependency
-
-You can find Hadoop OSS FileSystem are packaged in the hadoop-aliyun artifact. 
This JAR and all its dependencies need to be added to Flink’s classpath, i.e. 
the class path of both Job and TaskManagers.
-
-There are multiple ways of adding JARs to Flink’s class path, the easiest 
being simply to drop the JARs in Flink’s lib folder. You need to copy the 
hadoop-aliyun JAR with all its dependencies (You can find these as part of the 
Hadoop binaries in hadoop-3/share/hadoop/tools/lib). You can also export the 
directory containing these JARs as part of the HADOOP_CLASSPATH environment 
variable on all machines.
-
-## An Example
-Below is an example shows the result of our setup (data is generated by TPC-DS 
tool)
-
-{% highlight java %}
-// Read from OSS bucket
-scala> val dataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049")
-dataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@31940704
-
-scala> dataSet.print()
-1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY 
Metro|large|2935|1670015|8AM-4PM|Bob Belcher|6|More than other authori|Shared 
others could not count fully dollars. New members ca|Julius 
Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Oak Grove|Williamson 
County|TN|38370|United States|-5|0.11|
-2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid 
Atlantic|medium|1574|594972|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter 
animals. Consist|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.12|
-3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid 
Atlantic|medium|1574|1084486|8AM-4PM|Mark Hightower|2|Wrong troops shall work 
sometimes in a opti|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.01|
-4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2451063|North 
Midwest|medium|10137|6578913|8AM-4PM|Larry Mccray|2|Dealers make most 
historical, direct students|Rich groups catch longer other fears; 
future,|Matthew Clifton|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.05|
-5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2451063|North 
Midwest|small|17398|4610470|8AM-8AM|Larry Mccray|2|Dealers make most 
historical, direct students|Blue, due beds come. Politicians would not make far 
thoughts. Specifically new horses partic|Gary Colburn|4|ese|3|pri|463|Pine 
Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.12|
-6|AAAAAAAAEAAAAAAA|2002-01-01|||2451063|North 
Midwest|medium|13118|6585236|8AM-4PM|Larry Mccray|5|Silly particles could 
pro|Blue, due beds come. Politicians would not make far thoughts. Specifically 
new horses partic|Gary Colburn|5|anti|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.11|
-7|AAAAAAAAHAAAAAAA|1998-01-01|||2451024|Pacific 
Northwest|small|6280|1739560|8AM-4PM|Alden Snyder|6|Major, formal states can 
suppor|Reduced, subsequent bases could not lik|Frederick 
Weaver|5|anti|4|ese|415|Jefferson Tenth|Court|Suite 180|Riverside|Walker 
County|AL|39231|United States|-6|0.00|
-8|AAAAAAAAIAAAAAAA|1998-01-01|2000-12-31||2450808|California|small|4766|2459256|8AM-12AM|Wayne
 Ray|6|Here possible notions arrive only. Ar|Common, free creditors should 
exper|Daniel Weller|5|anti|2|able|550|Cedar Elm|Ct.|Suite I|Fairview|Williamson 
County|TN|35709|United States|-5|0.06|
-
-scala> dataSet.count()
-res0: Long = 8
-
-// Write to OSS bucket
-scala> dataSet.writeAsText("oss://<your-bucket>/50/call_center/data-m-00049.1")
-
-scala> benv.execute("My batch program")
-res1: org.apache.flink.api.common.JobExecutionResult = 
org.apache.flink.api.common.JobExecutionResult@77476fcf
-
-scala> val newDataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049.1")
-newDataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@40b70f31
-
-scala> newDataSet.count()
-res2: Long = 8
-
-{% endhighlight %}
-
-## Common Issues
-### Could not find OSS file system
-If your job submission fails with an Exception message like below, please 
check if our shaded jar (flink-oss-fs-hadoop-{{ site.version }}.jar) is in the 
lib dir.
-
-{% highlight plain %}
-Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
-       at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
-       at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
-       at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
-       ... 7 more
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Could not find a file system implementation for scheme 'oss'. 
The scheme is not directly supported by Flink and no Hadoop file system to 
support this scheme could be loaded.
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
-       ... 10 more
-Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Could not find a file system implementation for scheme 'oss'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
-       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
-       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
-       ... 17 more
-Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
-       at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
-       ... 22 more
-{% endhighlight %}
-
-### Missing configuration(s)
-If your job submission fails with an Exception message like below, please 
check if the corresponding configurations exits in `flink-conf.yaml`
-
-{% highlight plain %}
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Aliyun OSS endpoint should not be null or empty. Please set 
proper endpoint with 'fs.oss.endpoint'.
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
-       ... 10 more
-Caused by: java.lang.IllegalArgumentException: Aliyun OSS endpoint should not 
be null or empty. Please set proper endpoint with 'fs.oss.endpoint'.
-       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:145)
-       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
-       at 
org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:87)
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
-       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
-       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
-       ... 17 more
-{% endhighlight %}
+{% top %}
diff --git a/docs/ops/filesystems/oss.zh.md b/docs/ops/filesystems/oss.zh.md
index fa2dfb1..d6834d1 100644
--- a/docs/ops/filesystems/oss.zh.md
+++ b/docs/ops/filesystems/oss.zh.md
@@ -23,17 +23,15 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-* ToC
-{:toc}
-
-
 ## OSS: Object Storage Service
 
-[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun 
OSS) is widely used especially among China’s cloud users, and it provides cloud 
object storage for a variety of use cases.
+[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun 
OSS) is widely used, particularly popular among China’s cloud users, and it 
provides cloud object storage for a variety of use cases.
+You can use OSS with Flink for **reading** and **writing data** as well in 
conjunction with the [streaming **state backends**]({{ site.baseurl 
}}/ops/state/state_backends.html)
 
-[Hadoop file 
system](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html)
 supports OSS since version 2.9.1. Now, you can also use OSS with Fink for 
**reading** and **writing data**.
+* This will be replaced by the TOC
+{:toc}
 
-You can access OSS objects like this:
+You can use OSS objects like regular files by specifying paths in the 
following format:
 
 {% highlight plain %}
 oss://<your-bucket>/<object-name>
@@ -46,15 +44,15 @@ Below shows how to use OSS with Flink:
 env.readTextFile("oss://<your-bucket>/<object-name>");
 
 // Write to OSS bucket
-dataSet.writeAsText("oss://<your-bucket>/<object-name>")
+stream.writeAsText("oss://<your-bucket>/<object-name>")
 
+// Use OSS as FsStatebackend
+env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>"));
 {% endhighlight %}
 
-There are two ways to use OSS with Flink, our shaded `flink-oss-fs-hadoop` 
will cover most scenarios. However, you may need to set up a specific Hadoop 
OSS FileSystem implementation if you want use OSS as YARN's resource storage 
dir ([This patch](https://issues.apache.org/jira/browse/HADOOP-15919) enables 
YARN to use OSS). Both ways are described below.
-
-### Shaded Hadoop OSS file system (recommended)
+### Shaded Hadoop OSS file system 
 
-In order to use `flink-oss-fs-hadoop`, copy the respective JAR file from the 
opt directory to the lib directory of your Flink distribution before starting 
Flink, e.g.
+To use `flink-oss-fs-hadoop,` copy the respective JAR file from the opt 
directory to the lib directory of your Flink distribution before starting 
Flink, e.g.
 
 {% highlight bash %}
 cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/
@@ -65,7 +63,7 @@ cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/
 #### Configurations setup
 After setting up the OSS FileSystem wrapper, you need to add some 
configurations to make sure that Flink is allowed to access your OSS buckets.
 
-In order to use OSS with Flink more easily, you can use the same configuration 
keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
+To allow for easy adoption, you can use the same configuration keys in 
`flink-conf.yaml` as in Hadoop's `core-site.xml`
 
 You can see the configuration keys in the [Hadoop OSS 
documentation](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html).
 
@@ -77,157 +75,4 @@ fs.oss.accessKeyId: Aliyun access key ID
 fs.oss.accessKeySecret: Aliyun access key secret
 {% endhighlight %}
 
-### Hadoop-provided OSS file system - manual setup
-This setup is a bit more complex and we recommend using our shaded Hadoop file 
systems instead (see above) unless required otherwise, e.g. for using OSS as 
YARN’s resource storage dir via the fs.defaultFS configuration property in 
Hadoop’s core-site.xml.
-
-#### Set OSS FileSystem
-You need to point Flink to a valid Hadoop configuration, which contains the 
following properties in core-site.xml:
-
-{% highlight xml %}
-<configuration>
-
-<property>
-    <name>fs.oss.impl</name>
-    <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
-  </property>
-
-  <property>
-    <name>fs.oss.endpoint</name>
-    <value>Aliyun OSS endpoint to connect to</value>
-    <description>Aliyun OSS endpoint to connect to. An up-to-date list is 
provided in the Aliyun OSS Documentation.</description>
-  </property>
-
-  <property>
-    <name>fs.oss.accessKeyId</name>
-    <description>Aliyun access key ID</description>
-  </property>
-
-  <property>
-    <name>fs.oss.accessKeySecret</name>
-    <description>Aliyun access key secret</description>
-  </property>
-
-  <property>
-    <name>fs.oss.buffer.dir</name>
-    <value>/tmp/oss</value>
-  </property>
-
-</property>
-
-</configuration>
-{% endhighlight %}
-
-#### Hadoop Configuration
-
-You can specify the [Hadoop configuration](../config.html#hdfs) in various 
ways pointing Flink to
-the path of the Hadoop configuration directory, for example
-- by setting the environment variable `HADOOP_CONF_DIR`, or
-- by setting the `fs.hdfs.hadoopconf` configuration option in 
`flink-conf.yaml`:
-{% highlight yaml %}
-fs.hdfs.hadoopconf: /path/to/etc/hadoop
-{% endhighlight %}
-
-This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with 
Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the 
specified directory.
-
-#### Provide OSS FileSystem Dependency
-
-You can find Hadoop OSS FileSystem are packaged in the hadoop-aliyun artifact. 
This JAR and all its dependencies need to be added to Flink’s classpath, i.e. 
the class path of both Job and TaskManagers.
-
-There are multiple ways of adding JARs to Flink’s class path, the easiest 
being simply to drop the JARs in Flink’s lib folder. You need to copy the 
hadoop-aliyun JAR with all its dependencies (You can find these as part of the 
Hadoop binaries in hadoop-3/share/hadoop/tools/lib). You can also export the 
directory containing these JARs as part of the HADOOP_CLASSPATH environment 
variable on all machines.
-
-## An Example
-Below is an example shows the result of our setup (data is generated by TPC-DS 
tool)
-
-{% highlight java %}
-// Read from OSS bucket
-scala> val dataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049")
-dataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@31940704
-
-scala> dataSet.print()
-1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY 
Metro|large|2935|1670015|8AM-4PM|Bob Belcher|6|More than other authori|Shared 
others could not count fully dollars. New members ca|Julius 
Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Oak Grove|Williamson 
County|TN|38370|United States|-5|0.11|
-2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid 
Atlantic|medium|1574|594972|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter 
animals. Consist|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.12|
-3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid 
Atlantic|medium|1574|1084486|8AM-4PM|Mark Hightower|2|Wrong troops shall work 
sometimes in a opti|Largely blank years put substantially deaf, new others. 
Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 
70|Midway|Williamson County|TN|31904|United States|-5|0.01|
-4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2451063|North 
Midwest|medium|10137|6578913|8AM-4PM|Larry Mccray|2|Dealers make most 
historical, direct students|Rich groups catch longer other fears; 
future,|Matthew Clifton|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.05|
-5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2451063|North 
Midwest|small|17398|4610470|8AM-8AM|Larry Mccray|2|Dealers make most 
historical, direct students|Blue, due beds come. Politicians would not make far 
thoughts. Specifically new horses partic|Gary Colburn|4|ese|3|pri|463|Pine 
Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.12|
-6|AAAAAAAAEAAAAAAA|2002-01-01|||2451063|North 
Midwest|medium|13118|6585236|8AM-4PM|Larry Mccray|5|Silly particles could 
pro|Blue, due beds come. Politicians would not make far thoughts. Specifically 
new horses partic|Gary Colburn|5|anti|3|pri|463|Pine Ridge|RD|Suite U|Five 
Points|Ziebach County|SD|56098|United States|-6|0.11|
-7|AAAAAAAAHAAAAAAA|1998-01-01|||2451024|Pacific 
Northwest|small|6280|1739560|8AM-4PM|Alden Snyder|6|Major, formal states can 
suppor|Reduced, subsequent bases could not lik|Frederick 
Weaver|5|anti|4|ese|415|Jefferson Tenth|Court|Suite 180|Riverside|Walker 
County|AL|39231|United States|-6|0.00|
-8|AAAAAAAAIAAAAAAA|1998-01-01|2000-12-31||2450808|California|small|4766|2459256|8AM-12AM|Wayne
 Ray|6|Here possible notions arrive only. Ar|Common, free creditors should 
exper|Daniel Weller|5|anti|2|able|550|Cedar Elm|Ct.|Suite I|Fairview|Williamson 
County|TN|35709|United States|-5|0.06|
-
-scala> dataSet.count()
-res0: Long = 8
-
-// Write to OSS bucket
-scala> dataSet.writeAsText("oss://<your-bucket>/50/call_center/data-m-00049.1")
-
-scala> benv.execute("My batch program")
-res1: org.apache.flink.api.common.JobExecutionResult = 
org.apache.flink.api.common.JobExecutionResult@77476fcf
-
-scala> val newDataSet = 
benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049.1")
-newDataSet: org.apache.flink.api.scala.DataSet[String] = 
org.apache.flink.api.scala.DataSet@40b70f31
-
-scala> newDataSet.count()
-res2: Long = 8
-
-{% endhighlight %}
-
-## Common Issues
-### Could not find OSS file system
-If your job submission fails with an Exception message like below, please 
check if our shaded jar (flink-oss-fs-hadoop-{{ site.version }}.jar) is in the 
lib dir.
-
-{% highlight plain %}
-Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
-       at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
-       at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
-       at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
-       ... 7 more
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Could not find a file system implementation for scheme 'oss'. 
The scheme is not directly supported by Flink and no Hadoop file system to 
support this scheme could be loaded.
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
-       ... 10 more
-Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Could not find a file system implementation for scheme 'oss'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
-       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
-       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
-       ... 17 more
-Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
-       at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
-       ... 22 more
-{% endhighlight %}
-
-### Missing configuration(s)
-If your job submission fails with an Exception message like below, please 
check if the corresponding configurations exits in `flink-conf.yaml`
-
-{% highlight plain %}
-Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Aliyun OSS endpoint should not be null or empty. Please set 
proper endpoint with 'fs.oss.endpoint'.
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
-       at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
-       at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
-       ... 10 more
-Caused by: java.lang.IllegalArgumentException: Aliyun OSS endpoint should not 
be null or empty. Please set proper endpoint with 'fs.oss.endpoint'.
-       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:145)
-       at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
-       at 
org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:87)
-       at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
-       at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
-       at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
-       at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
-       at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259)
-       ... 17 more
-{% endhighlight %}
+{% top %}
diff --git a/docs/ops/filesystems/s3.md b/docs/ops/filesystems/s3.md
index b668ac9..f25e266 100644
--- a/docs/ops/filesystems/s3.md
+++ b/docs/ops/filesystems/s3.md
@@ -49,31 +49,27 @@ env.setStateBackend(new 
FsStateBackend("s3://<your-bucket>/<endpoint>"));
 
 Note that these examples are *not* exhaustive and you can use S3 in other 
places as well, including your [high availability 
setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ 
site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); 
everywhere that Flink expects a FileSystem URI.
 
-For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3
-filesystem wrappers which are fairly easy to set up. For some cases, however, 
e.g. for using S3 as
-YARN's resource storage dir, it may be necessary to set up a specific Hadoop 
S3 FileSystem
-implementation. Both ways are described below.
+For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3 filesystem wrappers which are self-contained and easy 
to set up.
+For some cases, however, e.g., for using S3 as YARN's resource storage dir, it 
may be necessary to set up a specific [Hadoop S3 
FileSystem](../deployment/aws.html#hadoop-provided-s3-file-systems) 
implementation.
 
-### Shaded Hadoop/Presto S3 file systems
+### Shaded Hadoop/Presto S3 File Systems
 
 {% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](../deployment/aws.html). %}
 
 Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and 
`flink-s3-fs-hadoop`.
-Both implementations are self-contained with no dependency footprint.
-There is no need to add Hadoop to the classpath to use them.
-Both internally use some Hadoop code, but "shade away" all classes to avoid 
any dependency conflicts.
+Both implementations are self-contained with no dependency footprint, so there 
is no need to add Hadoop to the classpath to use them.
 
   - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and 
*"s3p://"*, is based on code from the [Presto project](https://prestodb.io/).
-  You can configure it the same way you can [configure the Presto file 
system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration)
 by placing adding the configurations to your `flink-conf.yaml`.
+  You can configure it the same way you can [configure the Presto file 
system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration)
 by placing adding the configurations to your `flink-conf.yaml`. Presto is the 
recommended file system for checkpointing to S3.
       
   - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based on 
code from the [Hadoop Project](https://hadoop.apache.org/).
-  The file system can be [configured exactly like Hadoop's 
s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by placing adding the configurations to your `flink-conf.yaml`.
+  The file system can be [configured exactly like Hadoop's 
s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by placing adding the configurations to your `flink-conf.yaml`. Shaded Hadoop 
is the only S3 file system with support for the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html).
     
 Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
 wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers
 for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can
 use this to use both at the same time.
-This can happen when, for example, the job uses the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, 
but uses Presto for checkpointing.
+For example, the job uses the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, 
but uses Presto for checkpointing.
 In this case, it is advised to use explicitly *"s3a://"* as a scheme for the 
sink (Hadoop) and *"s3p://"* for checkpointing (Presto).
     
 To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the 
respective JAR file from the `opt` directory to the `lib` directory of your 
Flink distribution before starting Flink, e.g.
@@ -88,7 +84,7 @@ After setting up the S3 FileSystem wrapper, you need to make 
sure that Flink is
 
 ##### Identity and Access Management (IAM) (Recommended)
 
-The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...]
+The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need to access S3 buckets. Details about how to do this are beyond the scope of 
this documentation. Please refer to the AWS user guide. What you are looking 
for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for [...]
 
 If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
 
@@ -106,7 +102,7 @@ s3.secret-key: your-secret-key
 ## Configure Non-S3 Endpoint
 
 The S3 Filesystems also support using S3 compliant object stores such as 
[IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and 
[Minio](https://min.io/).
-To do so, simply configure your endpoint in `flink-conf.yaml`. 
+To do so, configure your endpoint in `flink-conf.yaml`. 
 
 {% highlight yaml %}
 s3.endpoint: your-endpoint-hostname
@@ -115,18 +111,15 @@ s3.endpoint: your-endpoint-hostname
 ## Entropy injection for S3 file systems
 
 The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) 
support entropy injection. Entropy injection is
-a technique to improve scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
+a technique to improve the scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
 
-If entropy injection is activated, a configured substring in the paths will be 
replaced by random characters. For example, path
+If entropy injection is activated, a configured substring in the path is 
replaced with random characters. For example, path
 `s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by 
something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`.
-
-**Note that this only happens when the file creation passes the option to 
inject entropy!**, otherwise the file path will
-simply remove the entropy key substring. See
-[FileSystem.create(Path, 
WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
+**This only happens when the file creation passes the option to inject 
entropy!**
+Otherwise, the file path removes the entropy key substring entirely. See 
[FileSystem.create(Path, 
WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
 for details.
 
-*Note: The Flink runtime currently passes the option to inject entropy only to 
checkpoint data files.*
-*All other files, including checkpoint metadata and external URI do not inject 
entropy, to keep checkpoint URIs predictable.*
+{% panel **Note:** The Flink runtime currently passes the option to inject 
entropy only to checkpoint data files. All other files, including checkpoint 
metadata and external URI, do not inject entropy to keep checkpoint URIs 
predictable. %}
 
 To enable entropy injection, configure the *entropy key* and the *entropy 
length* parameters.
 
@@ -138,4 +131,6 @@ s3.entropy.length: 4 (default)
 
 The `s3.entropy.key` defines the string in paths that is replaced by the 
random characters. Paths that do not contain the entropy key are left unchanged.
 If a file system operation does not pass the *"inject entropy"* write option, 
the entropy key substring is simply removed.
-The `s3.entropy.length` defined the number of random alphanumeric characters 
to replace the entropy key with.
\ No newline at end of file
+The `s3.entropy.length` defines the number of random alphanumeric characters 
used for entropy.
+
+{% top %}
\ No newline at end of file
diff --git a/docs/ops/filesystems/s3.zh.md b/docs/ops/filesystems/s3.zh.md
index b668ac9..f25e266 100644
--- a/docs/ops/filesystems/s3.zh.md
+++ b/docs/ops/filesystems/s3.zh.md
@@ -49,31 +49,27 @@ env.setStateBackend(new 
FsStateBackend("s3://<your-bucket>/<endpoint>"));
 
 Note that these examples are *not* exhaustive and you can use S3 in other 
places as well, including your [high availability 
setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ 
site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); 
everywhere that Flink expects a FileSystem URI.
 
-For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3
-filesystem wrappers which are fairly easy to set up. For some cases, however, 
e.g. for using S3 as
-YARN's resource storage dir, it may be necessary to set up a specific Hadoop 
S3 FileSystem
-implementation. Both ways are described below.
+For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and 
`flink-s3-fs-presto` S3 filesystem wrappers which are self-contained and easy 
to set up.
+For some cases, however, e.g., for using S3 as YARN's resource storage dir, it 
may be necessary to set up a specific [Hadoop S3 
FileSystem](../deployment/aws.html#hadoop-provided-s3-file-systems) 
implementation.
 
-### Shaded Hadoop/Presto S3 file systems
+### Shaded Hadoop/Presto S3 File Systems
 
 {% panel **Note:** You don't have to configure this manually if you are 
running [Flink on EMR](../deployment/aws.html). %}
 
 Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and 
`flink-s3-fs-hadoop`.
-Both implementations are self-contained with no dependency footprint.
-There is no need to add Hadoop to the classpath to use them.
-Both internally use some Hadoop code, but "shade away" all classes to avoid 
any dependency conflicts.
+Both implementations are self-contained with no dependency footprint, so there 
is no need to add Hadoop to the classpath to use them.
 
   - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and 
*"s3p://"*, is based on code from the [Presto project](https://prestodb.io/).
-  You can configure it the same way you can [configure the Presto file 
system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration)
 by placing adding the configurations to your `flink-conf.yaml`.
+  You can configure it the same way you can [configure the Presto file 
system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration)
 by placing adding the configurations to your `flink-conf.yaml`. Presto is the 
recommended file system for checkpointing to S3.
       
   - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based on 
code from the [Hadoop Project](https://hadoop.apache.org/).
-  The file system can be [configured exactly like Hadoop's 
s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by placing adding the configurations to your `flink-conf.yaml`.
+  The file system can be [configured exactly like Hadoop's 
s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)
 by placing adding the configurations to your `flink-conf.yaml`. Shaded Hadoop 
is the only S3 file system with support for the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html).
     
 Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
 wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers
 for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can
 use this to use both at the same time.
-This can happen when, for example, the job uses the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, 
but uses Presto for checkpointing.
+For example, the job uses the [StreamingFileSink]({{ 
site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, 
but uses Presto for checkpointing.
 In this case, it is advised to use explicitly *"s3a://"* as a scheme for the 
sink (Hadoop) and *"s3p://"* for checkpointing (Presto).
     
 To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the 
respective JAR file from the `opt` directory to the `lib` directory of your 
Flink distribution before starting Flink, e.g.
@@ -88,7 +84,7 @@ After setting up the S3 FileSystem wrapper, you need to make 
sure that Flink is
 
 ##### Identity and Access Management (IAM) (Recommended)
 
-The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need in order to access S3 buckets. Details about how to do this are beyond the 
scope of this documentation. Please refer to the AWS user guide. What you are 
looking for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...]
+The recommended way of setting up credentials on AWS is via [Identity and 
Access Management 
(IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You 
can use IAM features to securely give Flink instances the credentials that they 
need to access S3 buckets. Details about how to do this are beyond the scope of 
this documentation. Please refer to the AWS user guide. What you are looking 
for are [IAM 
Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for [...]
 
 If you set this up correctly, you can manage access to S3 within AWS and don't 
need to distribute any access keys to Flink.
 
@@ -106,7 +102,7 @@ s3.secret-key: your-secret-key
 ## Configure Non-S3 Endpoint
 
 The S3 Filesystems also support using S3 compliant object stores such as 
[IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and 
[Minio](https://min.io/).
-To do so, simply configure your endpoint in `flink-conf.yaml`. 
+To do so, configure your endpoint in `flink-conf.yaml`. 
 
 {% highlight yaml %}
 s3.endpoint: your-endpoint-hostname
@@ -115,18 +111,15 @@ s3.endpoint: your-endpoint-hostname
 ## Entropy injection for S3 file systems
 
 The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) 
support entropy injection. Entropy injection is
-a technique to improve scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
+a technique to improve the scalability of AWS S3 buckets through adding some 
random characters near the beginning of the key.
 
-If entropy injection is activated, a configured substring in the paths will be 
replaced by random characters. For example, path
+If entropy injection is activated, a configured substring in the path is 
replaced with random characters. For example, path
 `s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by 
something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`.
-
-**Note that this only happens when the file creation passes the option to 
inject entropy!**, otherwise the file path will
-simply remove the entropy key substring. See
-[FileSystem.create(Path, 
WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
+**This only happens when the file creation passes the option to inject 
entropy!**
+Otherwise, the file path removes the entropy key substring entirely. See 
[FileSystem.create(Path, 
WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)
 for details.
 
-*Note: The Flink runtime currently passes the option to inject entropy only to 
checkpoint data files.*
-*All other files, including checkpoint metadata and external URI do not inject 
entropy, to keep checkpoint URIs predictable.*
+{% panel **Note:** The Flink runtime currently passes the option to inject 
entropy only to checkpoint data files. All other files, including checkpoint 
metadata and external URI, do not inject entropy to keep checkpoint URIs 
predictable. %}
 
 To enable entropy injection, configure the *entropy key* and the *entropy 
length* parameters.
 
@@ -138,4 +131,6 @@ s3.entropy.length: 4 (default)
 
 The `s3.entropy.key` defines the string in paths that is replaced by the 
random characters. Paths that do not contain the entropy key are left unchanged.
 If a file system operation does not pass the *"inject entropy"* write option, 
the entropy key substring is simply removed.
-The `s3.entropy.length` defined the number of random alphanumeric characters 
to replace the entropy key with.
\ No newline at end of file
+The `s3.entropy.length` defines the number of random alphanumeric characters 
used for entropy.
+
+{% top %}
\ No newline at end of file

Reply via email to