pnowojski opened a new pull request, #25028: URL: https://github.com/apache/flink/pull/25028
## What is the purpose of the change State downloading in Flink can be a time and CPU consuming operation, which is especially visible if CPU resources per task slot are strictly restricted to for example a single CPU. Downloading 1GB of state size can take significant amount of time, while the code doing so is quite inefficient. Currently when downloading state files, Flink is creating an FSDataInputStream from the remote file, and copies its bytes, to an OutputStream pointing to a local file (in the RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream internally is being wrapped by many layers of abstractions and indirections and what’s worse, every file is being copied individually, which leads to quite high overheads for small files. Download times and download process CPU efficiency can be significantly improved if we introduced an API to allow org.apache.flink.core.fs.FileSystem to copy many files natively and all at once. For S3, there are at least two potential implementations. The first one is using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd party tool called s5cmd. It is claimed to be a faster alternative to the official AWS clients, which was confirmed by our benchmarks. ## Brief change log This PR covers two first steps for FLIP-444: - [Provide native file copy support for S3 using s5cmd](https://issues.apache.org/jira/browse/FLINK-35767) - [Use native file copy in RocksDBStateDownloader](https://issues.apache.org/jira/browse/FLINK-35768) ## Verifying this change This change is covered by a couple of new ITCases, that run s5cmd against Minio and some unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (**yes** / no / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
