[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279694#comment-17279694
 ] 

Galen Warren commented on FLINK-11838:
--------------------------------------

Thanks [~xtsong]! I really appreciate your help on this. Sorry if I jumped the 
gun on the PR, I'm happy to follow the process you've outlined, and if it makes 
more sense to ultimately submit the PR in multiple pieces instead of one, 
that's fine with me too.

So it would seem that the next step would be to discuss the proposed design 
here. I'll take a crack at that.

============================================

At a high level, the goal here is to implement RecoverableWriter for GCS, in 
order to be able to use StreamingFileSink to write to GCS. In Flink, 
recoverable writers are created by calling 
[FileSystem.createRecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystem.html#createRecoverableWriter--],
 so we will also need an implementation of org.apache.flink.core.fs.FileSystem 
for GCS in order to expose the recoverable writer to Flink.

The org.apache.flink.core.fs.FileSystem implementation is straightforward. 
Google provides an implementation of org.apache.hadoop.fs.FileSystem for GCS 
via 
[GoogleHadoopFileSystem|https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java],
 which can already be used in Flink in other contexts, e.g. for checkpoints. 
Flink provides 
[HadoopFileSystem|https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html],
 which implements org.apache.flink.core.fs.FileSystem in terms of an underlying 
org.apache.hadoop.fs.FileSystem. So, the proposed FileSystem for GCS would 
extend HadoopFileSystem, constructing it with an instance of 
GoogleHadoopFileSystem. This new FileSystem class would also override 
createRecoverableWriter to return a RecoverableWriter implementation for GCS. 

We also need an implementation of 
[FileSystemFactory|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html]
 and register it with SPI to expose the GCS FileSystem to Flink.

So, next is the interesting part – the GCS 
[RecoverableWriter|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
 implementation. At a high level, RecoverableWriter allows one to create an 
output stream, write bytes to that stream, persist and recover the state of 
that stream, and ultimately commit the write operation or abort it. In GCS, I 
propose to do recoverable writes in two steps:
 * First, write a blob to a temporary location using the [resumable 
uploads|https://cloud.google.com/storage/docs/resumable-uploads] feature of 
GCS. This allows for blobs to be written in chunks over a potentially long 
period of time (up to one week, per the docs). Importantly, it also allows for 
the write state to be persisted and recovered, via 
[WriteChannel.capture|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--]
 and 
[RestorableState.restore|https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/RestorableState.html],
 which we'll use to implement the persist/recover functionality in 
RecoverableWriter.
 * Second, commit the write operation by copying the temporary blob to the 
"final" blob location, i.e. the one specified in the initial call to 
RecoverableWriter.open, and deleting the temporary blob. In the event of an 
aborted upload, the cleanup would consist of deleting just the temporary blob.

So, in this approach, the recoverable state from Flink's perspective (i.e. 
CommitRecoverable and ResumeRecoverable) would include:
 * The RestorableState returned from WriteChannel
 * The write position (we have to keep track of this ourselves, because 
WriteChannel doesn't expose this)
 * The locations of the temporary and final blobs, so that we can ultimately 
commit or cleanup the overall operation

That's basically it at a high level.

I do want to point out one possible conflict with the Flink coding guidelines, 
though, to get your thoughts. The guidelines say – very emphatically! –  not to 
use Java serialization for anything. In the GCS case, the RestorableState that 
is returned from WriteChannel.capture is an object that implements Serializable 
but is otherwise opaque. This object does need to be serialized/deserialized as 
part of the RecoverableWriter implementation, and it's not clear to me how to 
do that except by using Java serialization.

========================================

I'll stop there for now, please let me know if this is the sort of information 
you're looking for here. I'm happy to drill into any area in more detail if 
you'd like, just let me know. Thanks again!

 

 

 

 

 

 

 

> Create RecoverableWriter for GCS
> --------------------------------
>
>                 Key: FLINK-11838
>                 URL: https://issues.apache.org/jira/browse/FLINK-11838
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.0
>            Reporter: Fokko Driesprong
>            Assignee: Galen Warren
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to