[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827568#comment-17827568
 ] 

Galen Warren edited comment on FLINK-34696 at 3/15/24 5:49 PM:
---------------------------------------------------------------

No, this just moves the storage of the temporary files to a different bucket so 
that a lifecycle policy can be applied to them. The intermediate blobs still 
have to be composed into the final blob either way. And yes there is one extra 
copy if you use the temp bucket, but the benefit is you won't orphan temporary 
files (more below).

Upon rereading and looking back at the code (it's been a while!), I may have 
misunderstood the initial issue. There could be a couple things going on here.

1) There could be orphaned temporary blobs sticking around, because they were 
written at one point and then due to restores from check/savepoints, they were 
"forgotten". Writing all intermediate files to a temporary bucket and then 
applying a lifecycle policy to that bucket would address that issue. It does 
come at the cost of one extra copy operation; in GCP blobs cannot be composed 
into a different bucket, so using the temporary bucket incurs one extra copy 
that doesn't occur if the temporary bucket is the same as the final bucket.

2) There could be large numbers of temporary blobs being composed together, 
i.e. the writing pattern is such that large numbers of writes occur between 
checkpoints. Since there's a limit to how many blobs can be composed together 
at one time (32), there's no way to avoid a temporary blob's data being 
involved in more than compose operation, in principle, but I do think the 
composition could be optimized differently.

I think #2 is the issue here, not #1? Though one should also be careful about 
orphaned blobs if state is ever restored from check/savepoints.

Regarding optimization: Currently, the composing of blobs occurs 
[here|https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131].

It does it in a way that minimizes the number of compose operations but not 
necessarily the total volume of data involved in those compose operations, 
which I think is the issue here.

Consider the situation where there are 94 temporary blobs to be composed into a 
committed blob. As it stands now, the first 32 would be committed into 1 blob, 
leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, 
composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then 
these remaining 32 blobs would be composed into 1 final blob. So, 3 total 
compose operations.

This could be done another way – the first 32 blobs could be composed into 1 
blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs 
could be composed into 1 blob, resulting in three intermediate blobs that would 
then be composed into the final blob. So, 4 total compose operations in this 
case. Still potentially recursive with large numbers of blobs due to the 
32-blob limit, but this would avoid the piling up of data in the first blob 
when there are large numbers of temporary blobs.

Even though the second method would generally involve more total compose 
operations than the first, it would minimize the total bytes being composed. I 
doubt the difference would be significant for small numbers of temporary blobs 
and could help with large numbers of temporary blobs. That would seem like a 
reasonable enhancement to me.


was (Author: galenwarren):
No, this just moves the storage of the temporary files to a different bucket so 
that a lifecycle policy can be applied to them. The intermediate blobs still 
have to be composed into the final blob either way. And yes there is one extra 
copy if you use the temp bucket, but the benefit is you won't orphan temporary 
files (more below).

Upon rereading and looking back at the code (it's been a while!), I may have 
misunderstood the initial issue. There could be a couple things going on here.

1) There could be orphaned temporary blobs sticking around, because they were 
written at one point and then due to restores from check/savepoints, they were 
"forgotten". Writing all intermediate files to a temporary bucket and then 
applying a lifecycle policy to that bucket would address that issue. It does 
come at the cost of one extra copy operation; in GCP blobs cannot be composed 
into a different bucket, so using the temporary bucket incurs one extra copy 
that doesn't occur if the temporary bucket is the same as the final bucket.

2) There could be large numbers of temporary blobs being composed together, 
i.e. the writing pattern is such that large numbers of writes occur between 
checkpoints. Since there's a limit to how many blobs can be composed together 
at one time (32), there's no way to avoid a temporary blob's data being 
involved in more than compose operation, in principle, but I do think the 
composition could be optimized differently.

I think #2 is the issue here, not #1? Though one should also be careful about 
orphaned blobs if state is ever restored from check/savepoints.

Regarding optimization: Currently, the composing of blobs occurs 
[here|[https://github.com/apache/flink/blob/f6e1b493bd6292a87efd130a0e76af8bd750c1c9/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/writer/GSRecoverableWriterCommitter.java#L131]].

It does it in a way that minimizes the number of compose operations but not 
necessarily the total volume of data involved in those compose operations, 
which I think is the issue here.

Consider the situation where there are 94 temporary blobs to be composed into a 
committed blob. As it stands now, the first 32 would be committed into 1 blob, 
leaving 1 + (94 - 32) = 63 blobs remaining. This process would be repeated, 
composing the first 32 against into 1, leaving 1 + (63 - 32) = 32 blobs. Then 
these remaining 32 blobs would be composed into 1 final blob. So, 3 total 
compose operations.

This could be done another way – the first 32 blobs could be composed into 1 
blob, the second 32 blobs could be composed into 1 blob, and the third 30 blobs 
could be composed into 1 blob, resulting in three intermediate blobs that would 
then be composed into the final blob. So, 4 total compose operations in this 
case. Still potentially recursive with large numbers of blobs due to the 
32-blob limit, but this would avoid the piling up of data in the first blob 
when there are large numbers of temporary blobs.

Even though the second method would generally involve more total compose 
operations than the first, it would minimize the total bytes being composed. I 
doubt the difference would be significant for small numbers of temporary blobs 
and could help with large numbers of temporary blobs. That would seem like a 
reasonable enhancement to me.

 

 

 

 

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-34696
>                 URL: https://issues.apache.org/jira/browse/FLINK-34696
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Simon-Shlomo Poil
>            Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to