[ 
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282191#comment-17282191
 ] 

Xintong Song commented on FLINK-11838:
--------------------------------------

Thanks [~galenwarren],

h4. Java Serialization

According to the veterans, the biggest concern for java serialization is indeed 
the compatibility issue. Relying on java serialization of classes provided by 
the GCS java library means that, whenever we bump the version of the library, 
we may break the checkpoint/savepoint compatibility if the library contains 
incompatible changes. And from what I understand, GCS is not designed to 
support resumable uploading cross versions, which means they may not pay 
attention to try reduce the incompatible changes.

h4. Switching to normal upload

In general, I think the one-week limitation and the lack of compatibility 
commitment both suggest that, GCS resumable uploading is not designed the long 
period or cross version use cases, like Flink's checkpoint / savepoint. In that 
sense, I think switching to the normal upload and maintain the recoverability 
ourselves makes sense.

h4. A few questions on the details.

bq. The RecoverableFsDatastream could buffer written data and write temporary 
blobs at various points (flush, sync, persist, close, or even write, if the 
buffered size exceeds some threshold). 

Wouldn't it be good enough to complete a temporary blob (and start a new one is 
necessary) when {{persist}} is called? IIUC, we can still use {{WriteChannel}} 
for uploading a temporary blob, let it decide when to flush buffered data to 
GCS, and close it when {{persist}} is called.

bq. and on commit the temporary files would be combined into one file and 
copied to the final blob location.

An alternative could be combining temporary blobs in advance, e.g., whenever we 
have 32 temporary blobs. I'm not entirely sure if this reduces the workload at 
the final committing if there's large amount of temporary blobs. Maybe we can 
conduct some simple experiments, see how the performance of GCS {{compose}} 
relates to the number and size of objects, before deciding when and how to 
combine the temporary blobs.

bq. In this approach, we'd probably need a Flink option to set the maximum size 
of a temporary blob, which would control how frequently temporary blobs would 
be created. Tuning this would involve a tradeoff between memory usage and 
checkpoint size on the Flink side vs. time to commit the writes via the 
recoverable writer, as I'd expect it to take longer to compose a larger number 
of smaller temporary blobs together into the final blob than it would a smaller 
number of larger temporary blobs.

I'm not sure how the checkpoint size is related. A checkpoint should be 
considered completed only if the temporary blobs are successfully persisted. 
Thus the checkpoint should not contain any buffered non-written data. Maybe we 
should simply relying on the streaming file sink operator and the checkpoint 
mechanism to decide how frequently the data should be persisted / flushed / 
synced, rather than introducing another set of tuning knobs.

> Create RecoverableWriter for GCS
> --------------------------------
>
>                 Key: FLINK-11838
>                 URL: https://issues.apache.org/jira/browse/FLINK-11838
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / FileSystem
>    Affects Versions: 1.8.0
>            Reporter: Fokko Driesprong
>            Assignee: Galen Warren
>            Priority: Major
>              Labels: pull-request-available, usability
>             Fix For: 1.13.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable 
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface: 
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming 
> of the files on the commit: 
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to 
> implement a more GCS native RecoverableWriter 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to