[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828081#comment-17828081
 ] 

Galen Warren edited comment on FLINK-34696 at 3/18/24 7:21 PM:
---------------------------------------------------------------

Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes the result.
{quote}
This atomicity is required, afaik, so writing to the final blob along the way 
wouldn't really work.

(Though, as I mentioned above, I do think it would be possible to improve the 
the efficiency of the blob composition somewhat in terms of total data storage 
required in cases where there are lots of files to be composed, even when using 
intermediate files.)

Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to 
delete some of the intermediate composed blobs along the way, i.e. any that are 
not "root" temporary blobs. They are currently deleted at the end of the 
commit.  I suppose that doing it along the way would cause those blobs to live 
a _slightly_ shorter period of time and not overlap as much. So I understand, 
which is the issue? Blob storage costs are prorated pretty granularly so I 
wouldn't think that short window of time would affect costs that much, but I 
could be wrong about that. Are you running into cost issues, total storage 
limits, something else (?) during the current bursts?

Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two 
blobs to stay under this limit when composing. I'm not completely sure what 
that means in terms of the atomicity requirement, i.e. I don't think there's 
any way to do multiple atomic writes in GCP, so there would be a possibility 
that data could appear and then disappear, i.e. in the case that write of final 
blob #1 succeeded and then the write of final blob #2 failed and both were 
deleted because of the failed commit. But, not sure what other options exist. 
It would be interesting to know how the S3 recoverable writer handles similar 
limits.

Re: *Rationale Behind Blob Composition:* see above.

Re: *TTL and temporary bucket:* Could you start without a defined TTL on the 
bucket while gathering the necessary data about the app, and then turn it on 
later? Or, you can just leave it off entirely, in which case the temp files go 
into the main bucket, just in a different location.


was (Author: galenwarren):
Re: {*}Composition{*}: My understanding is that Flink recoverable writers are 
supposed to follow a few rules, one of which is that data should not be visible 
in the final destination until a successful commit occurs. From the description 
of the 
[RecoverableWriter|[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]:|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableWriter.html]
{quote}The RecoverableWriter creates and recovers 
[{{RecoverableFsDataOutputStream}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html].
 It can be used to write data to a file system in a way that the writing can be 
resumed consistently after a failure and recovery without loss of data or 
possible duplication of bytes.

The streams do not make the files they write to immediately visible, but 
instead write to temp files or other temporary storage. To publish the data 
atomically in the end, the stream offers the 
[{{RecoverableFsDataOutputStream.closeForCommit()}}|https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/fs/RecoverableFsDataOutputStream.html#closeForCommit--]
 method to create a committer that publishes the result.
{quote}
This atomicity is required, afaik, so writing to the final blob along the way 
wouldn't really work.

(Though, as I mentioned above, I do think it would be possible to improve the 
the efficiency of the blob composition somewhat in terms of total data storage 
required in cases where there are lots of files to be composed, even when using 
intermediate files.)

Re: *Optimization of the Recovery Mechanism:* Yes, it would be possible to 
delete some of the intermediate composed blobs along the way, i.e. any that are 
not "root" intermediate blobs. They are currently deleted at the end of the 
commit.  I suppose that doing it along the way would cause those blobs to live 
a _slightly_ shorter period of time and not overlap as much. So I understand, 
which is the issue? Blob storage costs are prorated pretty granularly so I 
wouldn't think that short window of time would affect costs that much, but I 
could be wrong about that. Are you running into cost issues, total storage 
limits, something else (?) during the current bursts?

Re: *Handling the 5 TB Blob Limit:* Yes, it would be possible to write two 
blobs to stay under this limit when composing. I'm not completely sure what 
that means in terms of the atomicity requirement, i.e. I don't think there's 
any way to do multiple atomic writes in GCP, so there would be a possibility 
that data could appear and then disappear, i.e. in the case that write of final 
blob #1 succeeded and then the write of final blob #2 failed and both were 
deleted because of the failed commit. But, not sure what other options exist. 
It would be interesting to know how the S3 recoverable writer handles similar 
limits.

Re: *Rationale Behind Blob Composition:* see above.

Re: *TTL and temporary bucket:* Could you start without a defined TTL on the 
bucket while gathering the necessary data about the app, and then turn it on 
later? Or, you can just leave it off entirely, in which case the temp files go 
into the main bucket, just in a different location.

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-34696
>                 URL: https://issues.apache.org/jira/browse/FLINK-34696
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Simon-Shlomo Poil
>            Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to