[ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828249#comment-17828249
 ] 

Simon-Shlomo Poil edited comment on FLINK-34696 at 3/19/24 10:01 AM:
---------------------------------------------------------------------

Dear Galen,

 

*Composition:* Regarding that final blob should not written before the end, can 
be solved using a staging blob.

e.g. like this:
{code:java}
GSBlobIdentifier stagingBlob = blobContainer.getOneBlob();
while (blobContainer.hasNext()) {
  List<GSBlobIdentifier> listOfBlobs = blobContainer.getNextBlock();
  listOfBlobs.add(0, stagingBlob);
  if (blobContainer.lastBlock()) {
    Storage.compose(listOfBlobs, finalBlob);
  } else {
    Storage.compose(listOfBlobs, stagingBlob);
  }
}{code}
This code avoids using intermediate composition blobs, and, therefore, only 
doubles the storage size.

 

*Recovery and deletion of blobs:* 

For the recovery and deletion of temporary blobs, it's critical to consider 
scenarios without checkpointing or in BATCH mode, where {{closeForCommit}} 
operates with the entire blob list. Our current implementation, handling 
roughly 4.5 million blobs, generates numerous temporary composition blobs. So 
no, we do not see "bursts" of storage usage, but relative long duration usage 
of large storage usage. To streamline this, we could modify 
{{GSCommitRecoverable}} to update the list of {{{}componentObjectIds{}}}, 
allowing the removal of blobs already appended to the {{{}stagingBlob{}}}. This 
adjustment would maintain data integrity in recovery situations without 
duplicating blobs.

*Exception handling:*

I notice an other issue with the code - because the storage.compose might throw 
a StorageException. With the current code this would mean the intermediate 
composition blobs are not cleaned up. 

*5 TB limit*

Regarding the 5TB limit, employing temporary staging blobs could offer a 
solution, with all staging blobs ultimately being committed to final blobs upon 
successful completion of the process. 

*TTL*

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.

 


was (Author: sisp):
Dear Galen,

 

*Composition:* Regarding that final blob should not written before the end, can 
be solved using a staging blob.

e.g. like this:
{code:java}
GSBlobIdentifier stagingBlob = blobContainer.getOneBlob();
while (blobContainer.hasNext()) {
  List<GSBlobIdentifier> listOfBlobs = blobContainer.getNextBlock();
  listOfBlobs.add(0, stagingBlob);
  if (blobContainer.lastBlock()) {
    Storage.compose(listOfBlobs, finalBlob);
  } else {
    Storage.compose(listOfBlobs, stagingBlob);
  }
}{code}
This code avoids using intermediate composition blobs, and, therefore, only 
doubles the storage size.

 

*Recovery and deletion of blobs:* 

For the recovery and deletion of temporary blobs, it's critical to consider 
scenarios without checkpointing or in BATCH mode, where {{closeForCommit}} 
operates with the entire blob list. Our current implementation, handling 
roughly 4.5 million blobs, generates numerous temporary composition blobs. To 
streamline this, we could modify {{GSCommitRecoverable}} to update the list of 
{{{}componentObjectIds{}}}, allowing the removal of blobs already appended to 
the {{{}stagingBlob{}}}. This adjustment would maintain data integrity in 
recovery situations without duplicating blobs.

*Exception handling:*

I notice an other issue with the code - because the storage.compose might throw 
a StorageException. With the current code this would mean the intermediate 
composition blobs are not cleaned up. 

*5 TB limit*

Regarding the 5TB limit, employing temporary staging blobs could offer a 
solution, with all staging blobs ultimately being committed to final blobs upon 
successful completion of the process. 

*TTL*

If the code above is implemented, I think there is no longer a need the TTL 
feature since all necessary blobs should be written to a final blob. Any 
leftover blobs post-job completion would indicate a failed state.

 

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-34696
>                 URL: https://issues.apache.org/jira/browse/FLINK-34696
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Simon-Shlomo Poil
>            Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to