[
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828249#comment-17828249
]
Simon-Shlomo Poil commented on FLINK-34696:
-------------------------------------------
Dear Galen,
*Composition:* Regarding that final blob should not written before the end, can
be solved using a staging blob.
e.g. like this:
{code:java}
GSBlobIdentifier stagingBlob = blobContainer.getOneBlob();
while (blobContainer.hasNext()) {
List<GSBlobIdentifier> listOfBlobs = blobContainer.getNextBlock();
listOfBlobs.add(0, stagingBlob);
if (blobContainer.lastBlock()) {
Storage.compose(listOfBlobs, finalBlob);
} else {
Storage.compose(listOfBlobs, stagingBlob);
}
}{code}
This code avoids using intermediate composition blobs, and, therefore, only
doubles the storage size.
*Recovery and deletion of blobs:*
For the recovery and deletion of temporary blobs, it's critical to consider
scenarios without checkpointing or in BATCH mode, where {{closeForCommit}}
operates with the entire blob list. Our current implementation, handling
roughly 4.5 million blobs, generates numerous temporary composition blobs. To
streamline this, we could modify {{GSCommitRecoverable}} to update the list of
{{{}componentObjectIds{}}}, allowing the removal of blobs already appended to
the {{{}stagingBlob{}}}. This adjustment would maintain data integrity in
recovery situations without duplicating blobs.
*Exception handling:*
I notice an other issue with the code - because the storage.compose might throw
a StorageException. With the current code this would mean the intermediate
composition blobs are not cleaned up.
*5 TB limit*
Regarding the 5TB limit, employing temporary staging blobs could offer a
solution, with all staging blobs ultimately being committed to final blobs upon
successful completion of the process.
*TTL*
If the code above is implemented, I think there is no longer a need the TTL
feature since all necessary blobs should be written to a final blob. Any
leftover blobs post-job completion would indicate a failed state.
> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Reporter: Simon-Shlomo Poil
> Priority: Major
>
> The `composeBlobs` method in
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to
> merge multiple small blobs into a single large blob using Google Cloud
> Storage's compose method. This process is iterative, combining the result
> from the previous iteration with 31 new blobs until all blobs are merged.
> Upon completion of the composition, the method proceeds to remove the
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption
> during the blob composition process, incurring considerable costs due to
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
> - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
> - After 1st step: 32 blobs are merged into a single blob, increasing total
> storage to 96 GB (64 original + 32 GB new).
> - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs,
> raising the total to 159 GB.
> - After 3rd step: The final blob is merged, culminating in a total of 223 GB
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes
> of data can incur overheads in the petabyte range, leading to unexpectedly
> high costs. Additionally, we have observed an increase in storage exceptions
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to
> immediately delete source blobs once they have been successfully combined.
> This change could significantly reduce data duplication and associated costs.
> However, the implications for data recovery and integrity need careful
> consideration to ensure that this optimization does not compromise the
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively
> combined.
> 3. Note the final amount of data storage used compared to the initial total
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use,
> efficiently managing resources to combine blobs without generating excessive
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data
> storage, leading to high costs and potential system instability due to
> frequent storage exceptions.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)