[
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279694#comment-17279694
]
Galen Warren commented on FLINK-11838:
--------------------------------------
Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the
gun on the PR, I'm happy to follow the process you've outlined, and if it makes
more sense to ultimately submit the PR in multiple pieces instead of one,
that's fine with me too.
So it would seem that the next step would be to discuss the proposed design
here. I'll take a crack at that.
============================================
At a high level, the goal here is to implement RecoverableWriter for GCS, in
order to be able to use StreamingFileSink to write to GCS. In Flink,
recoverable writers are created by calling
[FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--],
so we will also need an implementation of org.apache.flink.core.fs.FileSystem
for GCS in order to expose the recoverable writer to Flink.
The org.apache.flink.core.fs.FileSystem implementation is straightforward.
Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS
via
[GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java],
which can already be used in Flink in other contexts, e.g. for checkpoints.
Flink provides
[HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html],
which implements org.apache.flink.core.fs.FileSystem in terms of an underlying
org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would
extend HadoopFileSystem, constructing it with an instance of
GoogleHadoopFileSystem. This new FileSystem class would also override
createRecoverableWriter to return a RecoverableWriter implementation for GCS.
We also need an implementation of
[FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html]
and register it with SPI to expose the GCS FileSystem to Flink.
So, next is the interesting part – the GCS
[RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
implementation. At a high level, RecoverableWriter allows one to create an
output stream, write bytes to that stream, persist and recover the state of
that stream, and ultimately commit the write operation or abort it. In GCS, I
propose to do recoverable writes in two steps:
* First, write a blob to a temporary location using the [resumable
uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of
GCS. This allows for blobs to be written in chunks over a potentially long
period of time (up to one week, per the docs). Importantly, it also allows for
the write state to be persisted and recovered, via
[WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--]
and
[RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html],
which we'll use to implement the persist/recover functionality in
RecoverableWriter.
* Second, commit the write operation by copying the temporary blob to the
"final" blob location, i.e. the one specified in the initial call to
RecoverableWriter.open, and deleting the temporary blob. In the event of an
aborted upload, the cleanup would consist of deleting just the temporary blob.
So, in this approach, the recoverable state from Flink's perspective (i.e.
CommitRecoverable and ResumeRecoverable) would include:
* The RestorableState returned from WriteChannel
* The write position (we have to keep track of this ourselves, because
WriteChannel doesn't expose this)
* The locations of the temporary and final blobs, so that we can ultimately
commit or cleanup the overall operation
That's basically it at a high level.
I do want to point out one possible conflict with the Flink coding guidelines,
though, to get your thoughts. The guidelines say – very emphatically! – not to
use Java serialization for anything. In the GCS case, the RestorableState that
is returned from WriteChannel.capture is an object that implements Serializable
but is otherwise opaque. This object does need to be serialized/deserialized as
part of the RecoverableWriter implementation, and it's not clear to me how to
do that except by using Java serialization.
========================================
I'll stop there for now, please let me know if this is the sort of information
you're looking for here. I'm happy to drill into any area in more detail if
you'd like, just let me know. Thanks again!
> Create RecoverableWriter for GCS
> --------------------------------
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.8.0
> Reporter: Fokko Driesprong
> Assignee: Galen Warren
> Priority: Major
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface:
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming
> of the files on the commit:
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to
> implement a more GCS native RecoverableWriter
--
This message was sent by Atlassian Jira
(v8.3.4#803005)