[
https://issues.apache.org/jira/browse/FLINK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280587#comment-17280587
]
Galen Warren commented on FLINK-11838:
--------------------------------------
Thanks. All of your initial assumptions are correct, though I might add that it
is straightforward to serde a RestorableState<WriteChannel> if Java
serialization is allowed (i.e. the object is guaranteed to implement
Serializable). But I understand that's not preferred.
To answer your questions:
*Does "writing a blob to a temporary location" mean that the user always needs
to configure a temporary location? How is the temporary location cleaned, say
if they're never moved to the committed location?*
I was thinking to manage temporary locations through a couple of Flink options:
* First, a "prefix" option, which would be a string to be applied in front of
the supplied, "final" blob name. This prefix could be default be something like
".inprogress/".
* Second, an option for the bucket to use for temp blobs, which would not be
required. If not supplied, the same bucket would be used for temp blobs as for
the associated final blobs.
I was also planning on appending a UUID string to the end of temp blob
locations, in order to guarantee their uniqueness during the temp-blob writing
phase, in case somehow multiple writes to the same final blob were in progress
at the same time.
So, with the defaults, a write to a blob {{gs://final_bucket/foo/bar}} would
use a temp blob location of, say,
{{gs://final_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}.
The prefix could be configured via the first option; also, if the user wanted
to write all temp blobs to a "temp_bucket" bucket, that bucket could be
specified via the second option, yielding
{{gs://temp_bucket/.inprogress/foo/bar/1157f422-32af-4e32-a797-2a0a05f28ecf}}.
As for cleaning the temporary blobs, is that what
[RecoverableWriter.cleanupRecoverableState|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html#cleanupRecoverableState-org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable-]
does? I was thinking to clean up any temporary blobs there. Beyond that, if a
separate bucket for temporary blobs were used, I suppose one could apply an
[object lifecycle-management
rule|https://cloud.google.com/storage/docs/lifecycle] that would delete blobs
after some period of time. These rules look to be applicable only at the bucket
level, so this would only work if a separate bucket were used just for
temporary blobs.
*Per [this doc|https://cloud.google.com/storage/docs/resumable-uploads], a
resumable upload must be completed within a week. This could be surprising for
the users, if they try to restore a job from checkpoints/savepoints after
pausing for more than one week.*
Yes, I would propose to disclose this limitation via a disclaimer, similar to
the one used for S3:
{quote}Important Note 2: To guarantee exactly-once semantics while being
efficient, the {{StreamingFileSink}} uses the [Multi-part
Upload|https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html]
feature of S3 (MPU from now on). This feature allows to upload files in
independent chunks (thus the "multi-part") which can be combined into the
original file when all the parts of the MPU are successfully uploaded. For
inactive MPUs, S3 supports a bucket lifecycle rule that the user can use to
abort multipart uploads that don't complete within a specified number of days
after being initiated. This implies that if you set this rule aggressively and
take a savepoint with some part-files being not fully uploaded, their
associated MPUs may time-out before the job is restarted. This will result in
your job not being able to restore from that savepoint as the pending
part-files are no longer there and Flink will fail with an exception as it
tries to fetch them and fails.
{quote}
Admittedly, it does seem that S3 provides more configuration options here than
GCS. It would be nice if the week limit were configurable, but it doesn't seem
to be, based on my read.
*Relying on Java serialization means depending our compatibility on the
compatibility of GCS, which should be avoid if possible. Would it be possible
to directly work with the REST API and session URI? IIUC this is how the write
channel internally works.*
I'd need to look into it more closely, but yes, I think this could be possible.
I think we'd wind up reimplementing much of what is done in
[BlobWriteChannel|https://github.com/googleapis/java-storage/blob/master/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java]
and
[BaseWriteChannel|https://github.com/googleapis/java-core/blob/master/google-cloud-core/src/main/java/com/google/cloud/BaseWriteChannel.java],
which I suppose I was thinking would be good to avoid. The code in the
BlobWriteChannel regarding retries/flushing looks a bit complicated, but
perhaps we wouldn't need all that complication in the Flink case?
In the event we were to go this route, is there a preferred client to use in
Flink for HTTP/REST requests?
> Create RecoverableWriter for GCS
> --------------------------------
>
> Key: FLINK-11838
> URL: https://issues.apache.org/jira/browse/FLINK-11838
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / FileSystem
> Affects Versions: 1.8.0
> Reporter: Fokko Driesprong
> Assignee: Galen Warren
> Priority: Major
> Labels: pull-request-available, usability
> Fix For: 1.13.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> GCS supports the resumable upload which we can use to create a Recoverable
> writer similar to the S3 implementation:
> https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
> After using the Hadoop compatible interface:
> https://github.com/apache/flink/pull/7519
> We've noticed that the current implementation relies heavily on the renaming
> of the files on the commit:
> https://github.com/apache/flink/blob/master/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java#L233-L259
> This is suboptimal on an object store such as GCS. Therefore we would like to
> implement a more GCS native RecoverableWriter
--
This message was sent by Atlassian Jira
(v8.3.4#803005)