galenwarren removed a comment on pull request #7915:
URL: https://github.com/apache/flink/pull/7915#issuecomment-771716133


   I'm working on a GSC FileSystem/RecoverableWriter implementation, and 
@tillrohrmann suggested I take over this ticket.
   
   The proposal is pretty straightforward:
   
   - Implement a GSFileSystemFactory and GSFileSystem using the normal 
service-provider method. GSFileSystemFactory implements [FileSystemFactory 
](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/FileSystemFactory.html),
 supporting the ```gs``` scheme, and GSFileSystem extends 
[HadoopFileSystem](https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.html),
 constructing it with an instance of 
[GoogleHadoopFileSystem](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java).
   - Write files to GCS using a two-step process. First, write a blob to a 
temporary location using the [resumable 
uploads](https://cloud.google.com/storage/docs/resumable-uploads) feature of 
GCS. On commit, copy that temporary blob to the "final" blob location and 
delete the temporary blob. Resumable uploads can be active for up to a week, 
and they support saving/restoring the upload state via the [WriteChannel 
](https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html)interface.
 
   
   The serializable state used by RecoverableWriter is stored in the 
GSRecoverable class. This structure includes:
   - The restorable state returned by 
[WriteChannel.capture()](https://googleapis.dev/java/google-cloud-clients/0.90.0-alpha/com/google/cloud/WriteChannel.html#capture--)
   - The position in the write stream, i.e. number of bytes written
   - The write "plan", i.e. an instance of GSRecoverablePlan, which includes 
the temporary and final blob ids for the current write operation
   
   Three new Flink options are added:
   - **gs.upload.content.type**. This is the content type to use when writing 
to GCS. Default: ```application/octet-stream```.
   - **gs.upload.temp.bucket**. The bucket to use for temporary blobs. There is 
no default; if unspecified, the final blob bucket is used. Specifying this 
option would allow one to put temporary blobs in their own bucket.
   - **gs.upload.temp.prefix**. The prefix to apply to the final object name to 
generate the temporary object name. A UUID string is also appended to the 
temporary object name to avoid collisions during the resumable upload process. 
Default value: ```.inprogress```
   
   So, for example -- if a recoverable write were initiated to 
```gs://bucket-name/foo/bar```, the following would happen (assuming default 
values for the options above):
   - A temporary blob would be created at, say, 
```gs://bucket-name/.inprogress/foo/bar-e202b566-150f-4784-9025-136cf351fcfe```
   - Writes would be performed against this blob via resumable upload for up to 
a week, with support for saving and restoring the upload state
   - Upon commit, the temporary blob would be copied to 
```gs://bucket-name/foo/bar``` and the temporary blob would be deleted
   - Upon failure, the temporary blob would be deleted (via 
[RecoverableWriter.cleanupRecoverableState](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-))
   
   That's basically it. I have a working version of this locally with unit 
tests that is just about ready for a pull request -- I'll submit that PR and 
will link it here.
   
   Last, I do have a few questions that I'd be interested in thoughts on:
   - GoogleHadoopFilesystem is implemented in 
[gcs-connector](https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/gcs-connector);
 this is the same connector that can already be used in Flink to use the 
```gs``` scheme for, say, checkpoints. The guidance I've seen elsewhere is that 
this jar file should be copied directly into the Flink lib folder and not 
pulled directly into a job jar file. So, I've currently added this dependency 
to ```flink-gs-fs-hadoop``` with ```provided``` scope. Does this make sense, or 
should I use ```compiled``` scope?
   - Google's WriteChannel doesn't support flushing or syncing, so the 
```flush``` and ```sync``` methods of 
[RecoverableFsDataOutputStream](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html)
 are noops. This strikes me as ok because WriteChannel *does* properly support 
capture/restore of the channel state, with persistence guarantees, which is 
used to support persist/recover in the recoverable writer. But I thought I'd 
mention it in case I have that wrong ...
   - I took a crack at updating the docs in file_sink.md and streaming_sink.md 
in /docs/dev/connectors, to indicate Google Storage support. I see that both of 
those files have Chinese translations; I'd need some help there. :)
   
   Any feedback welcome, of course -- thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to