[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281769#comment-17281769
 ] 

Galen Warren commented on FLINK-11838:
--------------------------------------

Hi [~xintongsong] – a couple more thoughts here.

I think I understand why the code in 
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
 and 
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java]
 is more complicated than I expected; there is a 
[requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads]
 in resumable uploads that all the uploaded chunks (except the last one) be a 
multiple of 256 Kib, and so the writers have to buffer data in order to meet 
that requirement. The RestorableState then, in general, contains some amount of 
buffered data, i.e. data that has been written but that has not yet been sent 
as part of an upload chunk.

Also, I came across a bit of a disturbing 
[thread|https://issuetracker.google.com/issues/137168102] from last year, where 
a GCS bug was being discussed that, essentially, caused in-progress resumable 
uploads to disappear and return 410 GONE errors. Such a failure would obviously 
be a big problem for the Flink use case we're talking about here. That thread 
claims this bug is fixed as of July 2020; however, it got me thinking about 
alternative implementations, especially since they provided this guidance:
{quote}To retry successfully, catching 500 and 410 errors is required and, as 
the official documentation recommends 
([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]), 
implementing a retry by starting a new session for the upload that received an 
unsuccessful status code but still needs uploading. 
{quote}
.... which we would be unable to follow.

So here's a different implementation idea. What if we didn't use resumable 
uploads at all, and instead just uploaded data in a series of temporary blobs, 
each of which would be uploaded via a normal, nonresumable upload. The 
[RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]
 could buffer written data and write temporary blobs at various points (flush, 
sync, persist, close, or even write, if the buffered size exceeds some 
threshold). In this approach, the resumable state would include a list of all 
the temporary blobs uploaded associated with this write, and on commit the 
temporary files would be combined into one file and copied to the final blob 
location.

To combine the temp blobs into the final blob, we could use the 
[compose|https://cloud.google.com/storage/docs/composing-objects] feature of 
GCS, which allows combining up to 32 blobs into one in a single call, and which 
could compose an arbitrary number of blobs with a pretty simple algorithm.

A few additional benefits of this approach, potentially:
 * Each temp blob write would be a normal, nonresumable upload with known 
contents and size at the time of upload, so we could use CRC checks as 
recommended [here|https://cloud.google.com/storage/docs/hashes-etags].
 * We'd sidestep the one-week limitation; the lifetime of temporary blobs could 
be made indefinite or managed via bucket policy
 * We would be in full control of the resumable state, so we'd avoid any issues 
related to Java serialization

In this approach, we'd probably need a Flink option to set the maximum size of 
a temporary blob, which would control how frequently temporary blobs would be 
created. Tuning this would involve a tradeoff between memory usage and 
checkpoint size on the Flink side vs. time to commit the writes via the 
recoverable writer, as I'd expect it to take longer to compose a larger number 
of smaller temporary blobs together into the final blob than it would a smaller 
number of larger temporary blobs.

Anyway, just a thought – let me know what you think. And enjoy the New Year! :)

 

 

 

 

> Create RecoverableWriter for GCS
> --------------------------------
>
>                 Key: FLINK-11838
>                 URL: https://issues.apache.org/jira/browse/FLINK-11838
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.0
>            Reporter: Fokko Driesprong
>            Assignee: Galen Warren
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.13.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to