pnowojski opened a new pull request, #25028:
URL: https://github.com/apache/flink/pull/25028

   ## What is the purpose of the change
   
   State downloading in Flink can be a time and CPU consuming operation, which 
is especially visible if CPU resources per task slot are strictly restricted to 
for example a single CPU. Downloading 1GB of state size can take significant 
amount of time, while the code doing so is quite inefficient.
   
   Currently when downloading state files, Flink is creating an 
FSDataInputStream from the remote file, and copies its bytes, to an 
OutputStream pointing to a local file (in the 
RocksDBStateDownloader#downloadDataForStateHandle method). FSDataInputStream 
internally is being wrapped by many layers of abstractions and indirections and 
what’s worse, every file is being copied individually, which leads to quite 
high overheads for small files. Download times and download process CPU 
efficiency can be significantly improved if we introduced an API to allow 
org.apache.flink.core.fs.FileSystem to copy many files natively and all at once.
   
   For S3, there are at least two potential implementations. The first one is 
using AWS SDKv2 directly (Flink currently is using AWS SDKv1 wrapped by 
hadoop/presto) and Amazon S3 Transfer Manager. Second option is to use a 3rd 
party tool called s5cmd. It is claimed to be a faster alternative to the 
official AWS clients, which was confirmed by our benchmarks.
   
   
   ## Brief change log
   
   This PR covers two first steps for FLIP-444:
   - [Provide native file copy support for S3 using 
s5cmd](https://issues.apache.org/jira/browse/FLINK-35767)
   - [Use native file copy in 
RocksDBStateDownloader](https://issues.apache.org/jira/browse/FLINK-35768)
   
   ## Verifying this change
   
   This change is covered by a couple of new ITCases, that run s5cmd against 
Minio and some unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
     - The S3 file system connector: (**yes** / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to