This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4c0bbc488091cf941ec9e8dfcf589bda5e7592cd Author: Seth Wiesman <[email protected]> AuthorDate: Wed May 1 11:45:57 2019 -0500 [hotfix][docs] Some fixes to FileSystem Documentation This closes #8326 --- docs/ops/deployment/aws.md | 317 ------------------------------- docs/ops/deployment/aws.zh.md | 387 -------------------------------------- docs/ops/filesystems/common.md | 27 +-- docs/ops/filesystems/common.zh.md | 27 +-- docs/ops/filesystems/index.md | 36 ++-- docs/ops/filesystems/index.zh.md | 130 +++---------- docs/ops/filesystems/oss.md | 178 ++---------------- docs/ops/filesystems/oss.zh.md | 179 ++---------------- docs/ops/filesystems/s3.md | 39 ++-- docs/ops/filesystems/s3.zh.md | 39 ++-- 10 files changed, 131 insertions(+), 1228 deletions(-) diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md index a639e9c..b465340 100644 --- a/docs/ops/deployment/aws.md +++ b/docs/ops/deployment/aws.md @@ -63,320 +63,3 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/ {% endhighlight %} {% top %} - -### Hadoop-provided S3 file systems - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -Apache Flink provides native [S3 FileSystem's](../filesystems/s3.html) out of the box and we recomend using them unless required otherwise, e.g. for using S3 as YARN's resource storage dir -via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`. - -#### Set S3 FileSystem - -Interaction with S3 happens via one of [Hadoop's S3 FileSystem clients](https://wiki.apache.org/hadoop/AmazonS3): - -1. `S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for reading and writing regular files using Amazon's SDK internally. No maximum file size and works with IAM roles. -2. `NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading and writing regular files. Maximum object size is 5GB and does not work with IAM roles. - -##### `S3AFileSystem` (Recommended) - -This is the recommended S3 FileSystem implementation to use. It uses Amazon's SDK internally and works with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)). - -You need to point Flink to a valid Hadoop configuration, which contains the following properties in `core-site.xml`: - -{% highlight xml %} -<configuration> - -<property> - <name>fs.s3.impl</name> - <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> -</property> - -<!-- Comma separated list of local directories used to buffer - large results prior to transmitting them to S3. --> -<property> - <name>fs.s3a.buffer.dir</name> - <value>/tmp</value> -</property> - -</configuration> -{% endhighlight %} - -This registers `S3AFileSystem` as the default FileSystem for URIs with the `s3a://` scheme. - -##### `NativeS3FileSystem` - -This file system is limited to files up to 5GB in size and it does not work with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)), meaning that you have to manually configure your AWS credentials in the Hadoop config file. - -You need to point Flink to a valid Hadoop configuration, which contains the following property in `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3.impl</name> - <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> -</property> -{% endhighlight %} - -This registers `NativeS3FileSystem` as the default FileSystem for URIs with the `s3://` scheme. - -{% top %} - -#### Hadoop Configuration - -You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to -the path of the Hadoop configuration directory, for example -- by setting the environment variable `HADOOP_CONF_DIR`, or -- by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`: -{% highlight yaml %} -fs.hdfs.hadoopconf: /path/to/etc/hadoop -{% endhighlight %} - -This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory. - -{% top %} - -#### Configure Access Credentials - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -After setting up the S3 FileSystem, you need to make sure that Flink is allowed to access your S3 buckets. - -##### Identity and Access Management (IAM) (Recommended) - -When using `S3AFileSystem`, the recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/ [...] - -If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. - -Note that this only works with `S3AFileSystem` and not `NativeS3FileSystem`. - -{% top %} - -##### Access Keys with `S3AFileSystem` (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.secret.key` in Hadoop's `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3a.access.key</name> - <value></value> -</property> - -<property> - <name>fs.s3a.secret.key</name> - <value></value> -</property> -{% endhighlight %} - -{% top %} - -##### Access Keys with `NativeS3FileSystem` (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. But this is discouraged and you should use `S3AFileSystem` [with the required IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAccessKey` in Hadoop's `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3.awsAccessKeyId</name> - <value></value> -</property> - -<property> - <name>fs.s3.awsSecretAccessKey</name> - <value></value> -</property> -{% endhighlight %} - -{% top %} - -#### Provide S3 FileSystem Dependency - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws` artifact (Hadoop version 2.6 and later). This JAR and all its dependencies need to be added to Flink's classpath, i.e. the class path of both Job and TaskManagers. Depending on which FileSystem implementation and which Flink and Hadoop version you use, you need to provide different dependencies (see below). - -There are multiple ways of adding JARs to Flink's class path, the easiest being simply to drop the JARs in Flink's `lib` folder. You need to copy the `hadoop-aws` JAR with all its dependencies. You can also export the directory containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on all machines. - -##### Flink for Hadoop 2.7 - -Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.7/share/hadoop/tools/lib`: - -- `S3AFileSystem`: - - `hadoop-aws-2.7.3.jar` - - `aws-java-sdk-s3-1.11.183.jar` and its dependencies: - - `aws-java-sdk-core-1.11.183.jar` - - `aws-java-sdk-kms-1.11.183.jar` - - `jackson-annotations-2.6.7.jar` - - `jackson-core-2.6.7.jar` - - `jackson-databind-2.6.7.jar` - - `joda-time-2.8.1.jar` - - `httpcore-4.4.4.jar` - - `httpclient-4.5.3.jar` - -- `NativeS3FileSystem`: - - `hadoop-aws-2.7.3.jar` - - `guava-11.0.2.jar` - -Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink. - -##### Flink for Hadoop 2.6 - -Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.6/share/hadoop/tools/lib`: - -- `S3AFileSystem`: - - `hadoop-aws-2.6.4.jar` - - `aws-java-sdk-1.7.4.jar` and its dependencies: - - `jackson-annotations-2.1.1.jar` - - `jackson-core-2.1.1.jar` - - `jackson-databind-2.1.1.jar` - - `joda-time-2.2.jar` - - `httpcore-4.2.5.jar` - - `httpclient-4.2.5.jar` - -- `NativeS3FileSystem`: - - `hadoop-aws-2.6.4.jar` - - `guava-11.0.2.jar` - -Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink. - -##### Flink for Hadoop 2.4 and earlier - -These Hadoop versions only have support for `NativeS3FileSystem`. This comes pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don't need to add anything to the classpath. - -{% top %} - -## Common Issues - -The following sections lists common issues when working with Flink on AWS. - -### Missing S3 FileSystem Configuration - -If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3. Please check out the configuration sections for our [shaded Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic Hadoop](#set-s3-filesystem) file systems for details on how to configure this properly. - -{% highlight plain %} -org.apache.flink.client.program.ProgramInvocationException: The program execution failed: - Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: - No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...] -Caused by: java.io.IOException: No file system found with scheme s3, - referenced in file URI 's3://<bucket>/<endpoint>'. - at o.a.f.core.fs.FileSystem.get(FileSystem.java:296) - at o.a.f.core.fs.Path.getFileSystem(Path.java:311) - at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) - at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) - at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) -{% endhighlight %} - -{% top %} - -### AWS Access Key ID and Secret Access Key Not Specified - -If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the access credential section for our [shaded Hadoop/Presto](#configure-access-credentials) or [generic Hadoop](#configure-access-credentials-1) file systems for details on how to configure this. - -{% highlight plain %} -org.apache.flink.client.program.ProgramInvocationException: The program execution failed: - Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] -Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the - HDFS NameNode at <bucket>, but the File System could not be initialized with that address: - AWS Access Key ID and Secret Access Key must be specified as the username or password - (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId - or fs.s3n.awsSecretAccessKey properties (respectively) [...] -Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must - be specified as the username or password (respectively) of a s3 URL, or by setting - the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...] - at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70) - at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80) - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) - at java.lang.reflect.Method.invoke(Method.java:606) - at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) - at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) - at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source) - at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330) - at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321) -{% endhighlight %} - -{% top %} - -### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found - -If you see this Exception, the S3 FileSystem is not part of the class path of Flink. Please refer to [S3 FileSystem dependency section](#provide-s3-filesystem-dependency) for details on how to configure this properly. - -{% highlight plain %} -Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186) - at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) - at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) - ... 25 more -Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154) - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178) - ... 32 more -Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152) - ... 33 more -{% endhighlight %} - -{% top %} - -### IOException: `400: Bad Request` - -If you have configured everything properly, but get a `Bad Request` Exception **and** your S3 bucket is located in region `eu-central-1`, you might be running an S3 client, which does not support [Amazon's signature version 4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html). - -{% highlight plain %} -[...] -Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...] -Caused by: org.jets3t.service.impl.rest.HttpException [...] -{% endhighlight %} -or -{% highlight plain %} -com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...] - -{% endhighlight %} - -This should not apply to our shaded Hadoop/Presto S3 file systems but can occur for Hadoop-provided -S3 file systems. In particular, all Hadoop versions up to 2.7.2 running `NativeS3FileSystem` (which -depend on `JetS3t 0.9.0` instead of a version [>= 0.9.4](http://www.jets3t.org/RELEASE_NOTES.html)) -are affected but users also reported this happening with the `S3AFileSystem`. - -Except for changing the bucket region, you may also be able to solve this by -[requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version), -e.g. by adding this to Flink's JVM options in `flink-conf.yaml` (see -[configuration](../config.html#common-options)): -{% highlight yaml %} -env.java.opts: -Dcom.amazonaws.services.s3.enableV4 -{% endhighlight %} - -{% top %} - -### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator - -This Exception is usually caused by skipping the local buffer directory configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly. - -{% highlight plain %} -[...] -Caused by: java.lang.NullPointerException at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at -o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at -o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at -o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at -o.a.h.fs.FileSystem.create(FileSystem.java:907) at -o.a.h.fs.FileSystem.create(FileSystem.java:888) at -o.a.h.fs.FileSystem.create(FileSystem.java:785) at -o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at -o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at -... 25 more -{% endhighlight %} - -{% top %} diff --git a/docs/ops/deployment/aws.zh.md b/docs/ops/deployment/aws.zh.md index ae43c40..b465340 100644 --- a/docs/ops/deployment/aws.zh.md +++ b/docs/ops/deployment/aws.zh.md @@ -63,390 +63,3 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/ {% endhighlight %} {% top %} - -## S3: Simple Storage Service - -[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl}}/ops/state/state_backends.html) or even as a YARN object storage. - -You can use S3 objects like regular files by specifying paths in the following format: - -{% highlight plain %} -s3://<your-bucket>/<endpoint> -{% endhighlight %} - -The endpoint can either be a single file or a directory, for example: - -{% highlight java %} -// Read from S3 bucket -env.readTextFile("s3://<bucket>/<endpoint>"); - -// Write to S3 bucket -stream.writeAsText("s3://<bucket>/<endpoint>"); - -// Use S3 as FsStatebackend -env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); -{% endhighlight %} - -Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. - -For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 -filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as -YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem -implementation. Both ways are described below. - -### Shaded Hadoop/Presto S3 file systems (recommended) - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the -`opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g. - -{% highlight bash %} -cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/ -{% endhighlight %} - -Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem -wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers -for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can -use this to use both at the same time. - -#### Configure Access Credentials - -After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets. - -##### Identity and Access Management (IAM) (Recommended) - -The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...] - -If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. - -##### Access Keys (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -You need to configure both `s3.access-key` and `s3.secret-key` in Flink's `flink-conf.yaml`: - -{% highlight yaml %} -s3.access-key: your-access-key -s3.secret-key: your-secret-key -{% endhighlight %} - -{% top %} - -### Hadoop-provided S3 file systems - manual setup - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -This setup is a bit more complex and we recommend using our shaded Hadoop/Presto file systems -instead (see above) unless required otherwise, e.g. for using S3 as YARN's resource storage dir -via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`. - -#### Set S3 FileSystem - -Interaction with S3 happens via one of [Hadoop's S3 FileSystem clients](https://wiki.apache.org/hadoop/AmazonS3): - -1. `S3AFileSystem` (**recommended** for Hadoop 2.7 and later): file system for reading and writing regular files using Amazon's SDK internally. No maximum file size and works with IAM roles. -2. `NativeS3FileSystem` (for Hadoop 2.6 and earlier): file system for reading and writing regular files. Maximum object size is 5GB and does not work with IAM roles. - -##### `S3AFileSystem` (Recommended) - -This is the recommended S3 FileSystem implementation to use. It uses Amazon's SDK internally and works with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)). - -You need to point Flink to a valid Hadoop configuration, which contains the following properties in `core-site.xml`: - -{% highlight xml %} -<configuration> - -<property> - <name>fs.s3.impl</name> - <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> -</property> - -<!-- Comma separated list of local directories used to buffer - large results prior to transmitting them to S3. --> -<property> - <name>fs.s3a.buffer.dir</name> - <value>/tmp</value> -</property> - -</configuration> -{% endhighlight %} - -This registers `S3AFileSystem` as the default FileSystem for URIs with the `s3a://` scheme. - -##### `NativeS3FileSystem` - -This file system is limited to files up to 5GB in size and it does not work with IAM roles (see [Configure Access Credentials](#configure-access-credentials-1)), meaning that you have to manually configure your AWS credentials in the Hadoop config file. - -You need to point Flink to a valid Hadoop configuration, which contains the following property in `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3.impl</name> - <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> -</property> -{% endhighlight %} - -This registers `NativeS3FileSystem` as the default FileSystem for URIs with the `s3://` scheme. - -{% top %} - -#### Hadoop Configuration - -You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to -the path of the Hadoop configuration directory, for example -- by setting the environment variable `HADOOP_CONF_DIR`, or -- by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`: -{% highlight yaml %} -fs.hdfs.hadoopconf: /path/to/etc/hadoop -{% endhighlight %} - -This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory. - -{% top %} - -#### Configure Access Credentials - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -After setting up the S3 FileSystem, you need to make sure that Flink is allowed to access your S3 buckets. - -##### Identity and Access Management (IAM) (Recommended) - -When using `S3AFileSystem`, the recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/ [...] - -If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. - -Note that this only works with `S3AFileSystem` and not `NativeS3FileSystem`. - -{% top %} - -##### Access Keys with `S3AFileSystem` (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.secret.key` in Hadoop's `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3a.access.key</name> - <value></value> -</property> - -<property> - <name>fs.s3a.secret.key</name> - <value></value> -</property> -{% endhighlight %} - -{% top %} - -##### Access Keys with `NativeS3FileSystem` (Discouraged) - -Access to S3 can be granted via your **access and secret key pair**. But this is discouraged and you should use `S3AFileSystem` [with the required IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). - -For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAccessKey` in Hadoop's `core-site.xml`: - -{% highlight xml %} -<property> - <name>fs.s3.awsAccessKeyId</name> - <value></value> -</property> - -<property> - <name>fs.s3.awsSecretAccessKey</name> - <value></value> -</property> -{% endhighlight %} - -{% top %} - -#### Provide S3 FileSystem Dependency - -{% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](#emr-elastic-mapreduce). %} - -Hadoop's S3 FileSystem clients are packaged in the `hadoop-aws` artifact (Hadoop version 2.6 and later). This JAR and all its dependencies need to be added to Flink's classpath, i.e. the class path of both Job and TaskManagers. Depending on which FileSystem implementation and which Flink and Hadoop version you use, you need to provide different dependencies (see below). - -There are multiple ways of adding JARs to Flink's class path, the easiest being simply to drop the JARs in Flink's `lib` folder. You need to copy the `hadoop-aws` JAR with all its dependencies. You can also export the directory containing these JARs as part of the `HADOOP_CLASSPATH` environment variable on all machines. - -##### Flink for Hadoop 2.7 - -Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.7/share/hadoop/tools/lib`: - -- `S3AFileSystem`: - - `hadoop-aws-2.7.3.jar` - - `aws-java-sdk-s3-1.11.183.jar` and its dependencies: - - `aws-java-sdk-core-1.11.183.jar` - - `aws-java-sdk-kms-1.11.183.jar` - - `jackson-annotations-2.6.7.jar` - - `jackson-core-2.6.7.jar` - - `jackson-databind-2.6.7.jar` - - `joda-time-2.8.1.jar` - - `httpcore-4.4.4.jar` - - `httpclient-4.5.3.jar` - -- `NativeS3FileSystem`: - - `hadoop-aws-2.7.3.jar` - - `guava-11.0.2.jar` - -Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink. - -##### Flink for Hadoop 2.6 - -Depending on which file system you use, please add the following dependencies. You can find these as part of the Hadoop binaries in `hadoop-2.6/share/hadoop/tools/lib`: - -- `S3AFileSystem`: - - `hadoop-aws-2.6.4.jar` - - `aws-java-sdk-1.7.4.jar` and its dependencies: - - `jackson-annotations-2.1.1.jar` - - `jackson-core-2.1.1.jar` - - `jackson-databind-2.1.1.jar` - - `joda-time-2.2.jar` - - `httpcore-4.2.5.jar` - - `httpclient-4.2.5.jar` - -- `NativeS3FileSystem`: - - `hadoop-aws-2.6.4.jar` - - `guava-11.0.2.jar` - -Note that `hadoop-common` is available as part of Flink, but Guava is shaded by Flink. - -##### Flink for Hadoop 2.4 and earlier - -These Hadoop versions only have support for `NativeS3FileSystem`. This comes pre-packaged with Flink for Hadoop 2 as part of `hadoop-common`. You don't need to add anything to the classpath. - -{% top %} - -## Common Issues - -The following sections lists common issues when working with Flink on AWS. - -### Missing S3 FileSystem Configuration - -If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3. Please check out the configuration sections for our [shaded Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic Hadoop](#set-s3-filesystem) file systems for details on how to configure this properly. - -{% highlight plain %} -org.apache.flink.client.program.ProgramInvocationException: The program execution failed: - Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: - No file system found with scheme s3, referenced in file URI 's3://<bucket>/<endpoint>'. [...] -Caused by: java.io.IOException: No file system found with scheme s3, - referenced in file URI 's3://<bucket>/<endpoint>'. - at o.a.f.core.fs.FileSystem.get(FileSystem.java:296) - at o.a.f.core.fs.Path.getFileSystem(Path.java:311) - at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) - at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) - at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) -{% endhighlight %} - -{% top %} - -### AWS Access Key ID and Secret Access Key Not Specified - -If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the access credential section for our [shaded Hadoop/Presto](#configure-access-credentials) or [generic Hadoop](#configure-access-credentials-1) file systems for details on how to configure this. - -{% highlight plain %} -org.apache.flink.client.program.ProgramInvocationException: The program execution failed: - Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...] -Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the - HDFS NameNode at <bucket>, but the File System could not be initialized with that address: - AWS Access Key ID and Secret Access Key must be specified as the username or password - (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId - or fs.s3n.awsSecretAccessKey properties (respectively) [...] -Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must - be specified as the username or password (respectively) of a s3 URL, or by setting - the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively) [...] - at o.a.h.fs.s3.S3Credentials.initialize(S3Credentials.java:70) - at o.a.h.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80) - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) - at java.lang.reflect.Method.invoke(Method.java:606) - at o.a.h.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) - at o.a.h.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) - at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source) - at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330) - at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321) -{% endhighlight %} - -{% top %} - -### ClassNotFoundException: NativeS3FileSystem/S3AFileSystem Not Found - -If you see this Exception, the S3 FileSystem is not part of the class path of Flink. Please refer to [S3 FileSystem dependency section](#provide-s3-filesystem-dependency) for details on how to configure this properly. - -{% highlight plain %} -Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186) - at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) - at org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156) - ... 25 more -Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2154) - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2178) - ... 32 more -Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found - at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060) - at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152) - ... 33 more -{% endhighlight %} - -{% top %} - -### IOException: `400: Bad Request` - -If you have configured everything properly, but get a `Bad Request` Exception **and** your S3 bucket is located in region `eu-central-1`, you might be running an S3 client, which does not support [Amazon's signature version 4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html). - -{% highlight plain %} -[...] -Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...] -Caused by: org.jets3t.service.impl.rest.HttpException [...] -{% endhighlight %} -or -{% highlight plain %} -com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...] - -{% endhighlight %} - -This should not apply to our shaded Hadoop/Presto S3 file systems but can occur for Hadoop-provided -S3 file systems. In particular, all Hadoop versions up to 2.7.2 running `NativeS3FileSystem` (which -depend on `JetS3t 0.9.0` instead of a version [>= 0.9.4](http://www.jets3t.org/RELEASE_NOTES.html)) -are affected but users also reported this happening with the `S3AFileSystem`. - -Except for changing the bucket region, you may also be able to solve this by -[requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version), -e.g. by adding this to Flink's JVM options in `flink-conf.yaml` (see -[configuration](../config.html#common-options)): -{% highlight yaml %} -env.java.opts: -Dcom.amazonaws.services.s3.enableV4 -{% endhighlight %} - -{% top %} - -### NullPointerException at org.apache.hadoop.fs.LocalDirAllocator - -This Exception is usually caused by skipping the local buffer directory configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly. - -{% highlight plain %} -[...] -Caused by: java.lang.NullPointerException at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) at -o.a.h.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) at -o.a.h.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) at -o.a.h.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) at -o.a.h.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at -o.a.h.fs.FileSystem.create(FileSystem.java:907) at -o.a.h.fs.FileSystem.create(FileSystem.java:888) at -o.a.h.fs.FileSystem.create(FileSystem.java:785) at -o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at -o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at -... 25 more -{% endhighlight %} - -{% top %} diff --git a/docs/ops/filesystems/common.md b/docs/ops/filesystems/common.md index edef1dd..a8a371e 100644 --- a/docs/ops/filesystems/common.md +++ b/docs/ops/filesystems/common.md @@ -22,14 +22,14 @@ specific language governing permissions and limitations under the License. --> -Apache Flink provides a number of common configuration settings that work across all file system implementations. +Apache Flink provides several standard configuration settings that work across all file system implementations. * This will be replaced by the TOC {:toc} ## Default File System -If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used. +A default scheme (and authority) is used if paths to files do not explicitly specify a file system scheme (and authority). {% highlight yaml %} fs.default-scheme: <default-fs> @@ -40,10 +40,10 @@ For example, if the default file system configured as `fs.default-scheme: hdfs:/ ## Connection limiting -You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number -of concurrent reads / writes or open connections at the same time. +You can limit the total number of connections that a file system can concurrently open which is useful when the file system cannot handle a large number +of concurrent reads/writes or open connections at the same time. -For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. +For example, small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by its scheme. @@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite) fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite) {% endhighlight %} -You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on -the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed. -If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail. +You can limit the number of input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on +the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation blocks until some streams close. +If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening fails. -To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams: -`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amount of time, it is forcibly closed. +To prevent inactive streams from taking up the full pool (preventing new connections to be opened), you can add an inactivity timeout which forcibly closes them if they do not read/write any bytes for at least that amount of time: `fs.<scheme>.limit.stream-timeout`. -These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections. -In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different -authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools. \ No newline at end of file +Limit enforcment on a per TaskManager/file system basis. +Because file systems creation occurs per scheme and authority, different +authorities have independent connection pools. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools. + +{% top %} \ No newline at end of file diff --git a/docs/ops/filesystems/common.zh.md b/docs/ops/filesystems/common.zh.md index edef1dd..a8a371e 100644 --- a/docs/ops/filesystems/common.zh.md +++ b/docs/ops/filesystems/common.zh.md @@ -22,14 +22,14 @@ specific language governing permissions and limitations under the License. --> -Apache Flink provides a number of common configuration settings that work across all file system implementations. +Apache Flink provides several standard configuration settings that work across all file system implementations. * This will be replaced by the TOC {:toc} ## Default File System -If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used. +A default scheme (and authority) is used if paths to files do not explicitly specify a file system scheme (and authority). {% highlight yaml %} fs.default-scheme: <default-fs> @@ -40,10 +40,10 @@ For example, if the default file system configured as `fs.default-scheme: hdfs:/ ## Connection limiting -You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number -of concurrent reads / writes or open connections at the same time. +You can limit the total number of connections that a file system can concurrently open which is useful when the file system cannot handle a large number +of concurrent reads/writes or open connections at the same time. -For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. +For example, small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by its scheme. @@ -56,13 +56,14 @@ fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite) fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite) {% endhighlight %} -You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on -the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed. -If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail. +You can limit the number of input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on +the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation blocks until some streams close. +If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening fails. -To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams: -`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amount of time, it is forcibly closed. +To prevent inactive streams from taking up the full pool (preventing new connections to be opened), you can add an inactivity timeout which forcibly closes them if they do not read/write any bytes for at least that amount of time: `fs.<scheme>.limit.stream-timeout`. -These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections. -In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different -authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools. \ No newline at end of file +Limit enforcment on a per TaskManager/file system basis. +Because file systems creation occurs per scheme and authority, different +authorities have independent connection pools. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools. + +{% top %} \ No newline at end of file diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md index be8bbee..b8548c1 100644 --- a/docs/ops/filesystems/index.md +++ b/docs/ops/filesystems/index.md @@ -24,40 +24,37 @@ specific language governing permissions and limitations under the License. --> -Apache Flink uses file system for both ingest and output of data for streaming and batch applications as well as targets for checkpoint storage. -These file systems can be local such as *Unix*, distributed like *HDFS*, or even object stores such as *S3*. +Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery. +These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyum OSS*. The file system used for a particular file is determined by its URI scheme. For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster. -FileSystem instances are instantiated once per process and then cached / pooled, to -avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits. +File system instances are instantiated once per process and then cached/pooled, to avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits. * This will be replaced by the TOC {:toc} ### Built-in File Systems -Flink ships with support for most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS* and *OpenStack Swift FS*. -Each is identified by the scheme included in the URI of the provide file path. - Flink ships with implementations for the following file systems: - - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine, including any NFS or SAN that is mounted into that local file system. + - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine, including any NFS or SAN drives mounted into that local file system. - **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint. - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath. - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*. - The implementation `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. + The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}` When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder. #### HDFS and Hadoop File System support -For all schemes where Flink cannot find a directly supported file system, it will fall back to Hadoop. -All Hadoop file systems are automatically available when `flink-runtime` and the Hadoop libraries are in classpath. +For all schemes where Flink cannot find a directly supported file system, it falls back to Hadoop. +All Hadoop file systems are automatically available when `flink-runtime` and the Hadoop libraries are on the classpath. + This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS). @@ -67,12 +64,25 @@ This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-c - **har** - ... +##### Hadoop Configuration + +We recommend using Flink's built-in file systems them unless required otherwise, e.g., for using that file system as YARN's resource storage dir via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`. + +If using a Hadoop file system, you can specify the [configuration](../config.html#hdfs) by setting the environment variable `HADOOP_CONF_DIR`, or by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`. + +{% highlight yaml %} +fs.hdfs.hadoopconf: /path/to/etc/hadoop +{% endhighlight %} + +This registers `/path/to/etc/hadoop` as Hadoop's configuration directory and is where Flink will look for the `core-site.xml` and `hdfs-site.xml` files. + + ## Adding new File System Implementations File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify files and objects in that file system. -Implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations. +Implementations are discovered by Flink through Java's service abstraction, making it easy to add new file system implementations. -In order to add a new File System, the following steps are needed: +To add a new file system: - Add the File System implementation, which is a subclass of `org.apache.flink.core.fs.FileSystem`. - Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of `org.apache.flink.core.fs.FileSystemFactory`. diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md index 9390239..7f2de4e 100644 --- a/docs/ops/filesystems/index.zh.md +++ b/docs/ops/filesystems/index.zh.md @@ -24,66 +24,39 @@ specific language governing permissions and limitations under the License. --> -This page provides details on setting up and configuring different file systems for use with Flink. -We start by describing how to use and configure the different file systems that are supported by Flink -out-of-the-box, before describing the necessary steps in order to add support about other/custom file system -implementations. +Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery. +These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyum OSS*. -## Flink's File System support +The file system used for a particular file is determined by its URI scheme. +For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster. -Flink uses file systems both as *sources* and *sinks* in streaming/batch applications and as a target for *checkpointing*. -These file systems can for example be *Unix/Windows file systems*, *HDFS*, or even object stores like *S3*. +File system instances are instantiated once per process and then cached/pooled, to avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits. -The file system used for a specific file is determined by the file URI's scheme. For example `file:///home/user/text.txt` refers to -a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` refers to a file in a specific HDFS cluster. - -File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify -files and objects in that file system. FileSystem instances are instantiated once per process and then cached / pooled, to -avoid configuration overhead per stream creation and to enforce certain constraints, such as connection/stream limits. +* This will be replaced by the TOC +{:toc} ### Built-in File Systems -Flink ships with support for most of the popular file systems, namely *local*, *hadoop-compatible*, *S3*, *MapR FS* -and *OpenStack Swift FS*. Each of these is identified by the scheme included in the URI of the provide file path. - Flink ships with implementations for the following file systems: - - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine, -including any NFS or SAN that is mounted into that local file system. - - - **S3**: Flink directly provides file systems to talk to Amazon S3. There are two alternative implementations, `flink-s3-fs-presto` - and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint. There is no need to add Hadoop to - the classpath to use them. Both internally use some Hadoop code, but "shade away" all classes to avoid any dependency conflicts. - - - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and *"s3p://"*, is based on code from the [Presto project](https://prestodb.io/). - You can configure it the same way you can [configure the Presto file system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration). - - - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based on code from the [Hadoop Project](https://hadoop.apache.org/). - The file system can be [configured exactly like Hadoop's s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A). + - **local**: This file system is used when the scheme is *"file://"*, and it represents the file system of the local machine, including any NFS or SAN drives mounted into that local file system. - To use those file systems when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-s3-fs-presto:{{ site.version }}` - or `org.apache.flink:flink-s3-fs-hadoop:{{ site.version }}`). When starting a Flink application from the Flink binaries, copy or move - the respective jar file from the `opt` folder to the `lib` folder. See also [AWS setup](deployment/aws.html) for additional details. - - <span class="label label-danger">Attention</span>: As described above, both Hadoop and Presto "listen" to paths with scheme set to *"s3://"*. This is - convenient for switching between implementations (Hadoop or Presto), but it can lead to non-determinism when both - implementations are required. This can happen when, for example, the job uses the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html) - which only supports Hadoop, but uses Presto for checkpointing. In this case, it is advised to use explicitly *"s3a://"* - as a scheme for the sink (Hadoop) and *"s3p://"* for checkpointing (Presto). + - **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint. - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath. - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*. - The implementation `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. + The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}` When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder. #### HDFS and Hadoop File System support -For all schemes where it cannot find a directly supported file system, Flink will try to use Hadoop to instantiate a file system for the respective scheme. -All Hadoop file systems are automatically available once `flink-runtime` and the Hadoop libraries are in classpath. +For all schemes where Flink cannot find a directly supported file system, it falls back to Hadoop. +All Hadoop file systems are automatically available when `flink-runtime` and the Hadoop libraries are on the classpath. + -That way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS), for example: +This way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-compatible file systems (HCFS). - **hdfs** - **ftp** @@ -91,84 +64,25 @@ That way, Flink seamlessly supports all of Hadoop file systems, and all Hadoop-c - **har** - ... +##### Hadoop Configuration -## Common File System configurations - -The following configuration settings exist across different file systems. - -#### Default File System - -If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used. - -{% highlight yaml %} -fs.default-scheme: <default-fs> -{% endhighlight %} - -For example, if the default file system configured as `fs.default-scheme: hdfs://localhost:9000/`, then a file path of -`/user/hugo/in.txt` is interpreted as `hdfs://localhost:9000/user/hugo/in.txt`. - -#### Connection limiting - -You can limit the total number of connections that a file system can concurrently open. This is useful when the file system cannot handle a large number -of concurrent reads / writes or open connections at the same time. +We recommend using Flink's built-in file systems them unless required otherwise, e.g., for using that file system as YARN's resource storage dir via the `fs.defaultFS` configuration property in Hadoop's `core-site.xml`. -For example, very small HDFS clusters with few RPC handlers can sometimes be overwhelmed by a large Flink job trying to build up many connections during a checkpoint. - -To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by -its scheme. +If using a Hadoop file system, you can specify the [configuration](../config.html#hdfs) by setting the environment variable `HADOOP_CONF_DIR`, or by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`. {% highlight yaml %} -fs.<scheme>.limit.total: (number, 0/-1 mean no limit) -fs.<scheme>.limit.input: (number, 0/-1 mean no limit) -fs.<scheme>.limit.output: (number, 0/-1 mean no limit) -fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite) -fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite) +fs.hdfs.hadoopconf: /path/to/etc/hadoop {% endhighlight %} -You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on -the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed. -If the opening of the stream takes longer than `fs.<scheme>.limit.timeout`, the stream opening will fail. - -To prevent inactive streams from taking up the complete pool (preventing new connections to be opened), you can add an inactivity timeout for streams: -`fs.<scheme>.limit.stream-timeout`. If a stream does not read/write any bytes for at least that amount of time, it is forcibly closed. - -These limits are enforced per TaskManager, so each TaskManager in a Flink application or cluster will open up to that number of connections. -In addition, the limits are also only enforced per FileSystem instance. Because File Systems are created per scheme and authority, different -authorities will have their own connection pool. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools. - -## Entropy injection for S3 file systems - -The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is -a technique to improve scalability of AWS S3 buckets through adding some random characters near the beginning of the key. - -If entropy injection is activated, a configured substring in the paths will be replaced by random characters. For example, path -`s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`. - -**Note that this only happens when the file creation passes the option to inject entropy!**, otherwise the file path will -simply remove the entropy key substring. See -[FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) -for details. - -*Note: The Flink runtime currently passes the option to inject entropy only to checkpoint data files.* -*All other files, including checkpoint metadata and external URI do not inject entropy, to keep checkpoint URIs predictable.* - -To enable entropy injection, configure the *entropy key* and the *entropy length* parameters. - -``` -s3.entropy.key: _entropy_ -s3.entropy.length: 4 (default) - -``` +This registers `/path/to/etc/hadoop` as Hadoop's configuration directory and is where Flink will look for the `core-site.xml` and `hdfs-site.xml` files. -The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged. -If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. -The `s3.entropy.length` defined the number of random alphanumeric characters to replace the entropy key with. ## Adding new File System Implementations -File system implementations are discovered by Flink through Java's service abstraction, making it easy to add additional file system implementations. +File systems are represented via the `org.apache.flink.core.fs.FileSystem` class, which captures the ways to access and modify files and objects in that file system. +Implementations are discovered by Flink through Java's service abstraction, making it easy to add new file system implementations. -In order to add a new File System, the following steps are needed: +To add a new file system: - Add the File System implementation, which is a subclass of `org.apache.flink.core.fs.FileSystem`. - Add a factory that instantiates that file system and declares the scheme under which the FileSystem is registered. This must be a subclass of `org.apache.flink.core.fs.FileSystemFactory`. diff --git a/docs/ops/filesystems/oss.md b/docs/ops/filesystems/oss.md index 8711e6f..0c98c43 100644 --- a/docs/ops/filesystems/oss.md +++ b/docs/ops/filesystems/oss.md @@ -23,16 +23,15 @@ specific language governing permissions and limitations under the License. --> -* This will be replaced by the TOC -{:toc} - ## OSS: Object Storage Service -[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun OSS) is widely used especially among China’s cloud users, and it provides cloud object storage for a variety of use cases. +[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun OSS) is widely used, particularly popular among China’s cloud users, and it provides cloud object storage for a variety of use cases. +You can use OSS with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html) -[Hadoop file system](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html) supports OSS since version 2.9.1. Now, you can also use OSS with Fink for **reading** and **writing data**. +* This will be replaced by the TOC +{:toc} -You can access OSS objects like this: +You can use OSS objects like regular files by specifying paths in the following format: {% highlight plain %} oss://<your-bucket>/<object-name> @@ -45,15 +44,15 @@ Below shows how to use OSS with Flink: env.readTextFile("oss://<your-bucket>/<object-name>"); // Write to OSS bucket -dataSet.writeAsText("oss://<your-bucket>/<object-name>") +stream.writeAsText("oss://<your-bucket>/<object-name>") +// Use OSS as FsStatebackend +env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>")); {% endhighlight %} -There are two ways to use OSS with Flink, our shaded `flink-oss-fs-hadoop` will cover most scenarios. However, you may need to set up a specific Hadoop OSS FileSystem implementation if you want use OSS as YARN's resource storage dir ([This patch](https://issues.apache.org/jira/browse/HADOOP-15919) enables YARN to use OSS). Both ways are described below. - -### Shaded Hadoop OSS file system (recommended) +### Shaded Hadoop OSS file system -In order to use `flink-oss-fs-hadoop`, copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. +To use `flink-oss-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. {% highlight bash %} cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/ @@ -64,7 +63,7 @@ cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/ #### Configurations setup After setting up the OSS FileSystem wrapper, you need to add some configurations to make sure that Flink is allowed to access your OSS buckets. -In order to use OSS with Flink more easily, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` +To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` You can see the configuration keys in the [Hadoop OSS documentation](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html). @@ -76,157 +75,4 @@ fs.oss.accessKeyId: Aliyun access key ID fs.oss.accessKeySecret: Aliyun access key secret {% endhighlight %} -### Hadoop-provided OSS file system - manual setup -This setup is a bit more complex and we recommend using our shaded Hadoop file systems instead (see above) unless required otherwise, e.g. for using OSS as YARN’s resource storage dir via the fs.defaultFS configuration property in Hadoop’s core-site.xml. - -#### Set OSS FileSystem -You need to point Flink to a valid Hadoop configuration, which contains the following properties in core-site.xml: - -{% highlight xml %} -<configuration> - -<property> - <name>fs.oss.impl</name> - <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value> - </property> - - <property> - <name>fs.oss.endpoint</name> - <value>Aliyun OSS endpoint to connect to</value> - <description>Aliyun OSS endpoint to connect to. An up-to-date list is provided in the Aliyun OSS Documentation.</description> - </property> - - <property> - <name>fs.oss.accessKeyId</name> - <description>Aliyun access key ID</description> - </property> - - <property> - <name>fs.oss.accessKeySecret</name> - <description>Aliyun access key secret</description> - </property> - - <property> - <name>fs.oss.buffer.dir</name> - <value>/tmp/oss</value> - </property> - -</property> - -</configuration> -{% endhighlight %} - -#### Hadoop Configuration - -You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to -the path of the Hadoop configuration directory, for example -- by setting the environment variable `HADOOP_CONF_DIR`, or -- by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`: -{% highlight yaml %} -fs.hdfs.hadoopconf: /path/to/etc/hadoop -{% endhighlight %} - -This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory. - -#### Provide OSS FileSystem Dependency - -You can find Hadoop OSS FileSystem are packaged in the hadoop-aliyun artifact. This JAR and all its dependencies need to be added to Flink’s classpath, i.e. the class path of both Job and TaskManagers. - -There are multiple ways of adding JARs to Flink’s class path, the easiest being simply to drop the JARs in Flink’s lib folder. You need to copy the hadoop-aliyun JAR with all its dependencies (You can find these as part of the Hadoop binaries in hadoop-3/share/hadoop/tools/lib). You can also export the directory containing these JARs as part of the HADOOP_CLASSPATH environment variable on all machines. - -## An Example -Below is an example shows the result of our setup (data is generated by TPC-DS tool) - -{% highlight java %} -// Read from OSS bucket -scala> val dataSet = benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049") -dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@31940704 - -scala> dataSet.print() -1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY Metro|large|2935|1670015|8AM-4PM|Bob Belcher|6|More than other authori|Shared others could not count fully dollars. New members ca|Julius Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Oak Grove|Williamson County|TN|38370|United States|-5|0.11| -2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid Atlantic|medium|1574|594972|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter animals. Consist|Largely blank years put substantially deaf, new others. Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.12| -3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid Atlantic|medium|1574|1084486|8AM-4PM|Mark Hightower|2|Wrong troops shall work sometimes in a opti|Largely blank years put substantially deaf, new others. Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.01| -4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2451063|North Midwest|medium|10137|6578913|8AM-4PM|Larry Mccray|2|Dealers make most historical, direct students|Rich groups catch longer other fears; future,|Matthew Clifton|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.05| -5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2451063|North Midwest|small|17398|4610470|8AM-8AM|Larry Mccray|2|Dealers make most historical, direct students|Blue, due beds come. Politicians would not make far thoughts. Specifically new horses partic|Gary Colburn|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.12| -6|AAAAAAAAEAAAAAAA|2002-01-01|||2451063|North Midwest|medium|13118|6585236|8AM-4PM|Larry Mccray|5|Silly particles could pro|Blue, due beds come. Politicians would not make far thoughts. Specifically new horses partic|Gary Colburn|5|anti|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.11| -7|AAAAAAAAHAAAAAAA|1998-01-01|||2451024|Pacific Northwest|small|6280|1739560|8AM-4PM|Alden Snyder|6|Major, formal states can suppor|Reduced, subsequent bases could not lik|Frederick Weaver|5|anti|4|ese|415|Jefferson Tenth|Court|Suite 180|Riverside|Walker County|AL|39231|United States|-6|0.00| -8|AAAAAAAAIAAAAAAA|1998-01-01|2000-12-31||2450808|California|small|4766|2459256|8AM-12AM|Wayne Ray|6|Here possible notions arrive only. Ar|Common, free creditors should exper|Daniel Weller|5|anti|2|able|550|Cedar Elm|Ct.|Suite I|Fairview|Williamson County|TN|35709|United States|-5|0.06| - -scala> dataSet.count() -res0: Long = 8 - -// Write to OSS bucket -scala> dataSet.writeAsText("oss://<your-bucket>/50/call_center/data-m-00049.1") - -scala> benv.execute("My batch program") -res1: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@77476fcf - -scala> val newDataSet = benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049.1") -newDataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@40b70f31 - -scala> newDataSet.count() -res2: Long = 8 - -{% endhighlight %} - -## Common Issues -### Could not find OSS file system -If your job submission fails with an Exception message like below, please check if our shaded jar (flink-oss-fs-hadoop-{{ site.version }}.jar) is in the lib dir. - -{% highlight plain %} -Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) - at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) - at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) - at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) - ... 7 more -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273) - at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) - at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) - at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) - at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) - ... 10 more -Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259) - ... 17 more -Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. - at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) - ... 22 more -{% endhighlight %} - -### Missing configuration(s) -If your job submission fails with an Exception message like below, please check if the corresponding configurations exits in `flink-conf.yaml` - -{% highlight plain %} -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Aliyun OSS endpoint should not be null or empty. Please set proper endpoint with 'fs.oss.endpoint'. - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273) - at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) - at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) - at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) - at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) - ... 10 more -Caused by: java.lang.IllegalArgumentException: Aliyun OSS endpoint should not be null or empty. Please set proper endpoint with 'fs.oss.endpoint'. - at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:145) - at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323) - at org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:87) - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259) - ... 17 more -{% endhighlight %} +{% top %} diff --git a/docs/ops/filesystems/oss.zh.md b/docs/ops/filesystems/oss.zh.md index fa2dfb1..d6834d1 100644 --- a/docs/ops/filesystems/oss.zh.md +++ b/docs/ops/filesystems/oss.zh.md @@ -23,17 +23,15 @@ specific language governing permissions and limitations under the License. --> -* ToC -{:toc} - - ## OSS: Object Storage Service -[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun OSS) is widely used especially among China’s cloud users, and it provides cloud object storage for a variety of use cases. +[Aliyun Object Storage Service](https://www.aliyun.com/product/oss) (Aliyun OSS) is widely used, particularly popular among China’s cloud users, and it provides cloud object storage for a variety of use cases. +You can use OSS with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html) -[Hadoop file system](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html) supports OSS since version 2.9.1. Now, you can also use OSS with Fink for **reading** and **writing data**. +* This will be replaced by the TOC +{:toc} -You can access OSS objects like this: +You can use OSS objects like regular files by specifying paths in the following format: {% highlight plain %} oss://<your-bucket>/<object-name> @@ -46,15 +44,15 @@ Below shows how to use OSS with Flink: env.readTextFile("oss://<your-bucket>/<object-name>"); // Write to OSS bucket -dataSet.writeAsText("oss://<your-bucket>/<object-name>") +stream.writeAsText("oss://<your-bucket>/<object-name>") +// Use OSS as FsStatebackend +env.setStateBackend(new FsStateBackend("oss://<your-bucket>/<object-name>")); {% endhighlight %} -There are two ways to use OSS with Flink, our shaded `flink-oss-fs-hadoop` will cover most scenarios. However, you may need to set up a specific Hadoop OSS FileSystem implementation if you want use OSS as YARN's resource storage dir ([This patch](https://issues.apache.org/jira/browse/HADOOP-15919) enables YARN to use OSS). Both ways are described below. - -### Shaded Hadoop OSS file system (recommended) +### Shaded Hadoop OSS file system -In order to use `flink-oss-fs-hadoop`, copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. +To use `flink-oss-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. {% highlight bash %} cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/ @@ -65,7 +63,7 @@ cp ./opt/flink-oss-fs-hadoop-{{ site.version }}.jar ./lib/ #### Configurations setup After setting up the OSS FileSystem wrapper, you need to add some configurations to make sure that Flink is allowed to access your OSS buckets. -In order to use OSS with Flink more easily, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` +To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` You can see the configuration keys in the [Hadoop OSS documentation](http://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html). @@ -77,157 +75,4 @@ fs.oss.accessKeyId: Aliyun access key ID fs.oss.accessKeySecret: Aliyun access key secret {% endhighlight %} -### Hadoop-provided OSS file system - manual setup -This setup is a bit more complex and we recommend using our shaded Hadoop file systems instead (see above) unless required otherwise, e.g. for using OSS as YARN’s resource storage dir via the fs.defaultFS configuration property in Hadoop’s core-site.xml. - -#### Set OSS FileSystem -You need to point Flink to a valid Hadoop configuration, which contains the following properties in core-site.xml: - -{% highlight xml %} -<configuration> - -<property> - <name>fs.oss.impl</name> - <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value> - </property> - - <property> - <name>fs.oss.endpoint</name> - <value>Aliyun OSS endpoint to connect to</value> - <description>Aliyun OSS endpoint to connect to. An up-to-date list is provided in the Aliyun OSS Documentation.</description> - </property> - - <property> - <name>fs.oss.accessKeyId</name> - <description>Aliyun access key ID</description> - </property> - - <property> - <name>fs.oss.accessKeySecret</name> - <description>Aliyun access key secret</description> - </property> - - <property> - <name>fs.oss.buffer.dir</name> - <value>/tmp/oss</value> - </property> - -</property> - -</configuration> -{% endhighlight %} - -#### Hadoop Configuration - -You can specify the [Hadoop configuration](../config.html#hdfs) in various ways pointing Flink to -the path of the Hadoop configuration directory, for example -- by setting the environment variable `HADOOP_CONF_DIR`, or -- by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`: -{% highlight yaml %} -fs.hdfs.hadoopconf: /path/to/etc/hadoop -{% endhighlight %} - -This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory. - -#### Provide OSS FileSystem Dependency - -You can find Hadoop OSS FileSystem are packaged in the hadoop-aliyun artifact. This JAR and all its dependencies need to be added to Flink’s classpath, i.e. the class path of both Job and TaskManagers. - -There are multiple ways of adding JARs to Flink’s class path, the easiest being simply to drop the JARs in Flink’s lib folder. You need to copy the hadoop-aliyun JAR with all its dependencies (You can find these as part of the Hadoop binaries in hadoop-3/share/hadoop/tools/lib). You can also export the directory containing these JARs as part of the HADOOP_CLASSPATH environment variable on all machines. - -## An Example -Below is an example shows the result of our setup (data is generated by TPC-DS tool) - -{% highlight java %} -// Read from OSS bucket -scala> val dataSet = benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049") -dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@31940704 - -scala> dataSet.print() -1|AAAAAAAABAAAAAAA|1998-01-01|||2450952|NY Metro|large|2935|1670015|8AM-4PM|Bob Belcher|6|More than other authori|Shared others could not count fully dollars. New members ca|Julius Tran|3|pri|6|cally|730|Ash Hill|Boulevard|Suite 0|Oak Grove|Williamson County|TN|38370|United States|-5|0.11| -2|AAAAAAAACAAAAAAA|1998-01-01|2000-12-31||2450806|Mid Atlantic|medium|1574|594972|8AM-8AM|Felipe Perkins|2|A bit narrow forms matter animals. Consist|Largely blank years put substantially deaf, new others. Question|Julius Durham|5|anti|1|ought|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.12| -3|AAAAAAAACAAAAAAA|2001-01-01|||2450806|Mid Atlantic|medium|1574|1084486|8AM-4PM|Mark Hightower|2|Wrong troops shall work sometimes in a opti|Largely blank years put substantially deaf, new others. Question|Julius Durham|1|ought|2|able|984|Center Hill|Way|Suite 70|Midway|Williamson County|TN|31904|United States|-5|0.01| -4|AAAAAAAAEAAAAAAA|1998-01-01|2000-01-01||2451063|North Midwest|medium|10137|6578913|8AM-4PM|Larry Mccray|2|Dealers make most historical, direct students|Rich groups catch longer other fears; future,|Matthew Clifton|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.05| -5|AAAAAAAAEAAAAAAA|2000-01-02|2001-12-31||2451063|North Midwest|small|17398|4610470|8AM-8AM|Larry Mccray|2|Dealers make most historical, direct students|Blue, due beds come. Politicians would not make far thoughts. Specifically new horses partic|Gary Colburn|4|ese|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.12| -6|AAAAAAAAEAAAAAAA|2002-01-01|||2451063|North Midwest|medium|13118|6585236|8AM-4PM|Larry Mccray|5|Silly particles could pro|Blue, due beds come. Politicians would not make far thoughts. Specifically new horses partic|Gary Colburn|5|anti|3|pri|463|Pine Ridge|RD|Suite U|Five Points|Ziebach County|SD|56098|United States|-6|0.11| -7|AAAAAAAAHAAAAAAA|1998-01-01|||2451024|Pacific Northwest|small|6280|1739560|8AM-4PM|Alden Snyder|6|Major, formal states can suppor|Reduced, subsequent bases could not lik|Frederick Weaver|5|anti|4|ese|415|Jefferson Tenth|Court|Suite 180|Riverside|Walker County|AL|39231|United States|-6|0.00| -8|AAAAAAAAIAAAAAAA|1998-01-01|2000-12-31||2450808|California|small|4766|2459256|8AM-12AM|Wayne Ray|6|Here possible notions arrive only. Ar|Common, free creditors should exper|Daniel Weller|5|anti|2|able|550|Cedar Elm|Ct.|Suite I|Fairview|Williamson County|TN|35709|United States|-5|0.06| - -scala> dataSet.count() -res0: Long = 8 - -// Write to OSS bucket -scala> dataSet.writeAsText("oss://<your-bucket>/50/call_center/data-m-00049.1") - -scala> benv.execute("My batch program") -res1: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@77476fcf - -scala> val newDataSet = benv.readTextFile("oss://<your-bucket>/50/call_center/data-m-00049.1") -newDataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@40b70f31 - -scala> newDataSet.count() -res2: Long = 8 - -{% endhighlight %} - -## Common Issues -### Could not find OSS file system -If your job submission fails with an Exception message like below, please check if our shaded jar (flink-oss-fs-hadoop-{{ site.version }}.jar) is in the lib dir. - -{% highlight plain %} -Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) - at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) - at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) - at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) - ... 7 more -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273) - at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) - at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) - at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) - at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) - ... 10 more -Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'oss'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259) - ... 17 more -Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. - at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64) - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) - ... 22 more -{% endhighlight %} - -### Missing configuration(s) -If your job submission fails with an Exception message like below, please check if the corresponding configurations exits in `flink-conf.yaml` - -{% highlight plain %} -Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Aliyun OSS endpoint should not be null or empty. Please set proper endpoint with 'fs.oss.endpoint'. - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:273) - at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:827) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232) - at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) - at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151) - at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131) - at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294) - at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) - ... 10 more -Caused by: java.lang.IllegalArgumentException: Aliyun OSS endpoint should not be null or empty. Please set proper endpoint with 'fs.oss.endpoint'. - at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:145) - at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323) - at org.apache.flink.fs.osshadoop.OSSFileSystemFactory.create(OSSFileSystemFactory.java:87) - at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) - at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) - at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587) - at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62) - at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:259) - ... 17 more -{% endhighlight %} +{% top %} diff --git a/docs/ops/filesystems/s3.md b/docs/ops/filesystems/s3.md index b668ac9..f25e266 100644 --- a/docs/ops/filesystems/s3.md +++ b/docs/ops/filesystems/s3.md @@ -49,31 +49,27 @@ env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. -For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 -filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as -YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem -implementation. Both ways are described below. +For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem wrappers which are self-contained and easy to set up. +For some cases, however, e.g., for using S3 as YARN's resource storage dir, it may be necessary to set up a specific [Hadoop S3 FileSystem](../deployment/aws.html#hadoop-provided-s3-file-systems) implementation. -### Shaded Hadoop/Presto S3 file systems +### Shaded Hadoop/Presto S3 File Systems {% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](../deployment/aws.html). %} Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. -Both implementations are self-contained with no dependency footprint. -There is no need to add Hadoop to the classpath to use them. -Both internally use some Hadoop code, but "shade away" all classes to avoid any dependency conflicts. +Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and *"s3p://"*, is based on code from the [Presto project](https://prestodb.io/). - You can configure it the same way you can [configure the Presto file system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration) by placing adding the configurations to your `flink-conf.yaml`. + You can configure it the same way you can [configure the Presto file system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration) by placing adding the configurations to your `flink-conf.yaml`. Presto is the recommended file system for checkpointing to S3. - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based on code from the [Hadoop Project](https://hadoop.apache.org/). - The file system can be [configured exactly like Hadoop's s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by placing adding the configurations to your `flink-conf.yaml`. + The file system can be [configured exactly like Hadoop's s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by placing adding the configurations to your `flink-conf.yaml`. Shaded Hadoop is the only S3 file system with support for the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html). Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can use this to use both at the same time. -This can happen when, for example, the job uses the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, but uses Presto for checkpointing. +For example, the job uses the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, but uses Presto for checkpointing. In this case, it is advised to use explicitly *"s3a://"* as a scheme for the sink (Hadoop) and *"s3p://"* for checkpointing (Presto). To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g. @@ -88,7 +84,7 @@ After setting up the S3 FileSystem wrapper, you need to make sure that Flink is ##### Identity and Access Management (IAM) (Recommended) -The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...] +The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for [...] If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. @@ -106,7 +102,7 @@ s3.secret-key: your-secret-key ## Configure Non-S3 Endpoint The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [Minio](https://min.io/). -To do so, simply configure your endpoint in `flink-conf.yaml`. +To do so, configure your endpoint in `flink-conf.yaml`. {% highlight yaml %} s3.endpoint: your-endpoint-hostname @@ -115,18 +111,15 @@ s3.endpoint: your-endpoint-hostname ## Entropy injection for S3 file systems The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is -a technique to improve scalability of AWS S3 buckets through adding some random characters near the beginning of the key. +a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. -If entropy injection is activated, a configured substring in the paths will be replaced by random characters. For example, path +If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path `s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`. - -**Note that this only happens when the file creation passes the option to inject entropy!**, otherwise the file path will -simply remove the entropy key substring. See -[FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) +**This only happens when the file creation passes the option to inject entropy!** +Otherwise, the file path removes the entropy key substring entirely. See [FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) for details. -*Note: The Flink runtime currently passes the option to inject entropy only to checkpoint data files.* -*All other files, including checkpoint metadata and external URI do not inject entropy, to keep checkpoint URIs predictable.* +{% panel **Note:** The Flink runtime currently passes the option to inject entropy only to checkpoint data files. All other files, including checkpoint metadata and external URI, do not inject entropy to keep checkpoint URIs predictable. %} To enable entropy injection, configure the *entropy key* and the *entropy length* parameters. @@ -138,4 +131,6 @@ s3.entropy.length: 4 (default) The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged. If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. -The `s3.entropy.length` defined the number of random alphanumeric characters to replace the entropy key with. \ No newline at end of file +The `s3.entropy.length` defines the number of random alphanumeric characters used for entropy. + +{% top %} \ No newline at end of file diff --git a/docs/ops/filesystems/s3.zh.md b/docs/ops/filesystems/s3.zh.md index b668ac9..f25e266 100644 --- a/docs/ops/filesystems/s3.zh.md +++ b/docs/ops/filesystems/s3.zh.md @@ -49,31 +49,27 @@ env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>")); Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI. -For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 -filesystem wrappers which are fairly easy to set up. For some cases, however, e.g. for using S3 as -YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 FileSystem -implementation. Both ways are described below. +For most use cases, you may use one of our shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem wrappers which are self-contained and easy to set up. +For some cases, however, e.g., for using S3 as YARN's resource storage dir, it may be necessary to set up a specific [Hadoop S3 FileSystem](../deployment/aws.html#hadoop-provided-s3-file-systems) implementation. -### Shaded Hadoop/Presto S3 file systems +### Shaded Hadoop/Presto S3 File Systems {% panel **Note:** You don't have to configure this manually if you are running [Flink on EMR](../deployment/aws.html). %} Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. -Both implementations are self-contained with no dependency footprint. -There is no need to add Hadoop to the classpath to use them. -Both internally use some Hadoop code, but "shade away" all classes to avoid any dependency conflicts. +Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. - `flink-s3-fs-presto`, registered under the scheme *"s3://"* and *"s3p://"*, is based on code from the [Presto project](https://prestodb.io/). - You can configure it the same way you can [configure the Presto file system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration) by placing adding the configurations to your `flink-conf.yaml`. + You can configure it the same way you can [configure the Presto file system](https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration) by placing adding the configurations to your `flink-conf.yaml`. Presto is the recommended file system for checkpointing to S3. - `flink-s3-fs-hadoop`, registered under *"s3://"* and *"s3a://"*, based on code from the [Hadoop Project](https://hadoop.apache.org/). - The file system can be [configured exactly like Hadoop's s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by placing adding the configurations to your `flink-conf.yaml`. + The file system can be [configured exactly like Hadoop's s3a](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by placing adding the configurations to your `flink-conf.yaml`. Shaded Hadoop is the only S3 file system with support for the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html). Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem wrappers for URIs with the `s3://` scheme, `flink-s3-fs-hadoop` also registers for `s3a://` and `flink-s3-fs-presto` also registers for `s3p://`, so you can use this to use both at the same time. -This can happen when, for example, the job uses the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, but uses Presto for checkpointing. +For example, the job uses the [StreamingFileSink]({{ site.baseurl}}/dev/connectors/streamfile_sink.html) which only supports Hadoop, but uses Presto for checkpointing. In this case, it is advised to use explicitly *"s3a://"* as a scheme for the sink (Hadoop) and *"s3p://"* for checkpointing (Presto). To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g. @@ -88,7 +84,7 @@ After setting up the S3 FileSystem wrapper, you need to make sure that Flink is ##### Identity and Access Management (IAM) (Recommended) -The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need in order to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam- [...] +The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for [...] If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. @@ -106,7 +102,7 @@ s3.secret-key: your-secret-key ## Configure Non-S3 Endpoint The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [Minio](https://min.io/). -To do so, simply configure your endpoint in `flink-conf.yaml`. +To do so, configure your endpoint in `flink-conf.yaml`. {% highlight yaml %} s3.endpoint: your-endpoint-hostname @@ -115,18 +111,15 @@ s3.endpoint: your-endpoint-hostname ## Entropy injection for S3 file systems The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is -a technique to improve scalability of AWS S3 buckets through adding some random characters near the beginning of the key. +a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. -If entropy injection is activated, a configured substring in the paths will be replaced by random characters. For example, path +If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path `s3://my-bucket/checkpoints/_entropy_/dashboard-job/` would be replaced by something like `s3://my-bucket/checkpoints/gf36ikvg/dashboard-job/`. - -**Note that this only happens when the file creation passes the option to inject entropy!**, otherwise the file path will -simply remove the entropy key substring. See -[FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) +**This only happens when the file creation passes the option to inject entropy!** +Otherwise, the file path removes the entropy key substring entirely. See [FileSystem.create(Path, WriteOption)](https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) for details. -*Note: The Flink runtime currently passes the option to inject entropy only to checkpoint data files.* -*All other files, including checkpoint metadata and external URI do not inject entropy, to keep checkpoint URIs predictable.* +{% panel **Note:** The Flink runtime currently passes the option to inject entropy only to checkpoint data files. All other files, including checkpoint metadata and external URI, do not inject entropy to keep checkpoint URIs predictable. %} To enable entropy injection, configure the *entropy key* and the *entropy length* parameters. @@ -138,4 +131,6 @@ s3.entropy.length: 4 (default) The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged. If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. -The `s3.entropy.length` defined the number of random alphanumeric characters to replace the entropy key with. \ No newline at end of file +The `s3.entropy.length` defines the number of random alphanumeric characters used for entropy. + +{% top %} \ No newline at end of file
