[
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281769#comment-17281769
]
Galen Warren commented on FLINK-11838:
--------------------------------------
Hi [~xintongsong] – a couple more thoughts here.
I think I understand why the code in
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
and
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java]
is more complicated than I expected; there is a
[requirement|https://cloud.google.com/storage/docs/performing-resumable-uploads]
in resumable uploads that all the uploaded chunks (except the last one) be a
multiple of 256 Kib, and so the writers have to buffer data in order to meet
that requirement. The RestorableState then, in general, contains some amount of
buffered data, i.e. data that has been written but that has not yet been sent
as part of an upload chunk.
Also, I came across a bit of a disturbing
[thread|https://issuetracker.google.com/issues/137168102] from last year, where
a GCS bug was being discussed that, essentially, caused in-progress resumable
uploads to disappear and return 410 GONE errors. Such a failure would obviously
be a big problem for the Flink use case we're talking about here. That thread
claims this bug is fixed as of July 2020; however, it got me thinking about
alternative implementations, especially since they provided this guidance:
{quote}To retry successfully, catching 500 and 410 errors is required and, as
the official documentation recommends
([https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone]),
implementing a retry by starting a new session for the upload that received an
unsuccessful status code but still needs uploading.
{quote}
.... which we would be unable to follow.
So here's a different implementation idea. What if we didn't use resumable
uploads at all, and instead just uploaded data in a series of temporary blobs,
each of which would be uploaded via a normal, nonresumable upload. The
[RecoverableFsDatastream|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html]
could buffer written data and write temporary blobs at various points (flush,
sync, persist, close, or even write, if the buffered size exceeds some
threshold). In this approach, the resumable state would include a list of all
the temporary blobs uploaded associated with this write, and on commit the
temporary files would be combined into one file and copied to the final blob
location.
To combine the temp blobs into the final blob, we could use the
[compose|https://cloud.google.com/storage/docs/composing-objects] feature of
GCS, which allows combining up to 32 blobs into one in a single call, and which
could compose an arbitrary number of blobs with a pretty simple algorithm.
A few additional benefits of this approach, potentially:
* Each temp blob write would be a normal, nonresumable upload with known
contents and size at the time of upload, so we could use CRC checks as
recommended [here|https://cloud.google.com/storage/docs/hashes-etags].
* We'd sidestep the one-week limitation; the lifetime of temporary blobs could
be made indefinite or managed via bucket policy
* We would be in full control of the resumable state, so we'd avoid any issues
related to Java serialization
In this approach, we'd probably need a Flink option to set the maximum size of
a temporary blob, which would control how frequently temporary blobs would be
created. Tuning this would involve a tradeoff between memory usage and
checkpoint size on the Flink side vs. time to commit the writes via the
recoverable writer, as I'd expect it to take longer to compose a larger number
of smaller temporary blobs together into the final blob than it would a smaller
number of larger temporary blobs.
Anyway, just a thought – let me know what you think. And enjoy the New Year! :)
> Create RecoverableWriter for GCS
> --------------------------------
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / FileSystem
> Affects Versions: 1.8.0
> Reporter: Fokko Driesprong
> Assignee: Galen Warren
> Priority: Major
> Labels: pull-request-available, usability
> Fix For: 1.13.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface:
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming
> of the files on the commit:
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to
> implement a more GCS native RecoverableWriter
--
This message was sent by Atlassian Jira
(v8.3.4#803005)