[
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828412#comment-17828412
]
Galen Warren edited comment on FLINK-34696 at 3/19/24 5:06 PM:
---------------------------------------------------------------
I think your proposed algorithm is just another way of implementing the "delete
intermediate blobs that are no longer necessary as soon as possible" idea we've
considered. You accomplish it by overwriting the staging blob on each
iteration; a similar effect could be achieved by writing to a new intermediate
staging blob on each iteration (as is done now) and deleting the old one right
away (which is not done now. instead this deletion occurs shortly thereafter,
after the commit succeeds).
Aside: I'm not sure whether it's possible to overwrite the existing staging
blob like you suggest. The [docs |[Objects: compose | Cloud Storage |
Google
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose
operation and one of the input blobs to be composed. That _might_ work, but
would have to be tested to see if it's allowed. If it isn't, writing to a new
blob and deleting the old one would have essentially the same effect. That's
almost certainly what happens behind the scenes anyway, since blobs are
immutable.
Bigger picture, if you're trying to combine millions of immutable blobs
together in one step, 32 at a time, I don't see how you avoid having lots of
intermediate composed blobs, one way or another, at least temporarily. The main
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e.
uncomposed) temporary blobs that need to be composed and committed. If you
removed those raw blobs from that list as they're added to the staging blob,
what would that accomplish? The GSCommitRecoverable provided to the
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds
or fails, and if the commit were to be retried it would need the complete list
anyway.
{quote}I notice an other issue with the code - because the storage.compose
might throw a StorageException. With the current code this would mean the
intermediate composition blobs are not cleaned up.
If the code above is implemented, I think there is no longer a need the TTL
feature since all necessary blobs should be written to a final blob. Any
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all
failure scenarios. Even in your proposed algorithm, the staging blob could be
orphaned if the composition process failed partway through. This is what the
temp bucket and TTL mechanism are for. It's optional, so you don't have to use
it, but yes you would have to keep an eye out for orphaned intermediate blobs
via some other mechanism f you choose not to use it.
Honestly, I think some of your difficulties are coming from trying to combine
so much data together at once vs. doing it along the way, with commits at
incremental checkpoints. I was going to suggest you reduce your checkpoint
interval, to aggregate more frequently, but obviously that doesn't work if
you're not checkpointing at all.
I do think the RecoverableWriter mechanism is designed to make writing
predictable and repeatable in checkpoint/recovery situations (hence the name);
if you're not doing any of that, you may run into some challenges. If you were
to use checkpoints, you would aggregate more frequently, meaning fewer
intermediate blobs with shorter lifetimes, and you'd be less likely to run up
against the 5TB limit, as you would end up with a series of smaller files,
written at each checkpoint, vs. one huge file.
If we do want to consider changes, here's what I suggest:
* Test whether GCP allows composition to overwrite an existing blob that is
also an input to the compose operation. If that works, then we could use that
technique in the composeBlobs function. As mentioned above, I doubt that this
actually reduces the number of blobs that exist temporarily – since blobs are
immutable objects – but it would minimize their lifetime, effectively deleting
them as soon as possible. If overwriting doesn't work, then we could achieve a
similar effect by deleting intermediate blobs as soon as they are not needed
anymore, rather than waiting until the commit succeeds.
* Investigate how other RecoverableWriter implementations handle blob-size
limitations. For example, S3 has the same 5TB limit. As we've discussed, there
would be ways to write multiple files in such scenarios, but we'd have to give
up a bit of atomicity that is assumed in the design. I'm not really sure
whether that would be allowed or if the answer is "commit frequently enough to
keep the size of final blobs under 5TB." Someone else from the Flink team would
have to weigh in on that.
* Consider a change to the composing algorithm that I'll describe below, which
optimizes for total bytes written/composed vs. number of compose operations.
Possible alternate algorithm:
Consider the case where there are 125 raw temporary blobs to be aggregated,
each of size 1GB. The current method (and your proposed method) goes like this:
* Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written
into a new blob
* Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB
= 63GB written into a new blob
* Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB
= 94GB written into a new blob
* Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB
= 125GB written into a new blob
So, a total of 4 compose operations and 32GB + 63GB + 94GB + 125GB = 314GB
written/composed.
Alternately, we could do this:
* Combine the first 32 blobs into 1 intermediate blob – 1 compose operation,
32GB written into a new blob
* Combine the second 32 blobs into 1 intermediate blob – 1 compose operation
32GB written into a new blob
* Combine the third 32 blob into 1 intermediate blob – 1 compose operation,
32GB written into a new blob
* Combine the last 29 blobs into 1 intermediate blob – 1 compose operation,
29GB written into a new blob
* Combine the 4 intermediate blobs into one final blob – 1 compose operation,
32GB + 32GB + 32GB + 29GB = 125GB written into a new blob
This method has 5 compose operations – one more than the existing way – but
only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more
compose operations but less intermediate data and total data written.
EDIT: Assuming I did my math right: If N is the total number of raw temporary
blobs to aggregate, and they're all the same size, the total volume of data
written in the first algorithm (existing) is approximately proportional to N^2
as compared to N * log N for the second algorithm. My guess is that would have
the biggest impact in high-volume scenarios. Overwriting the same staging blob
would be moot in this case. We could still delete intermediate blobs along the
way, just not the raw temporary blobs, which have to be retained.
was (Author: galenwarren):
I think your proposed algorithm is just another way of implementing the "delete
intermediate blobs that are no longer necessary as soon as possible" idea we've
considered. You accomplish it by overwriting the staging blob on each
iteration; a similar effect could be achieved by writing to a new intermediate
staging blob on each iteration (as is done now) and deleting the old one right
away (which is not done now. instead this deletion occurs shortly thereafter,
after the commit succeeds).
Aside: I'm not sure whether it's possible to overwrite the existing staging
blob like you suggest. The [docs |[Objects: compose | Cloud Storage |
Google
Cloud|https://cloud.google.com/storage/docs/json_api/v1/objects/compose]] for
the compose operation say:
{quote}Concatenates a list of existing objects into a new object in the same
bucket. The existing source objects are unaffected by this operation
{quote}
In your proposal, the same staging blob is both the target of the compose
operation and one of the input blobs to be composed. That _might_ work, but
would have to be tested to see if it's allowed. If it isn't, writing to a new
blob and deleting the old one would have essentially the same effect. That's
almost certainly what happens behind the scenes anyway, since blobs are
immutable.
Bigger picture, if you're trying to combine millions of immutable blobs
together in one step, 32 at a time, I don't see how you avoid having lots of
intermediate composed blobs, one way or another, at least temporarily. The main
question would be how quickly ones that are no longer needed are discarded.
{quote}To streamline this, we could modify {{GSCommitRecoverable}} to update
the list of {{{}componentObjectIds{}}}, allowing the removal of blobs already
appended to the {{stagingBlob}}
{quote}
Not quite following you here; the GSCommitRecoverable is provided as an input
to the GSRecoverableWriterCommitter, providing the list of "raw" (i.e.
uncomposed) temporary blobs that need to be composed and committed. If you
removed those raw blobs from that list as they're added to the staging blob,
what would that accomplish? The GSCommitRecoverable provided to the
GSRecoverableWriterCommitter isn't persisted anywhere after the commit succeeds
or fails, and if the commit were to be retried it would need the complete list
anyway.
{quote}I notice an other issue with the code - because the storage.compose
might throw a StorageException. With the current code this would mean the
intermediate composition blobs are not cleaned up.
If the code above is implemented, I think there is no longer a need the TTL
feature since all necessary blobs should be written to a final blob. Any
leftover blobs post-job completion would indicate a failed state.
{quote}
There is no real way to prevent the orphaning of intermediate blobs in all
failure scenarios. Even in your proposed algorithm, the staging blob could be
orphaned if the composition process failed partway through. This is what the
temp bucket and TTL mechanism are for. It's optional, so you don't have to use
it, but yes you would have to keep an eye out for orphaned intermediate blobs
via some other mechanism f you choose not to use it.
Honestly, I think some of your difficulties are coming from trying to combine
so much data together at once vs. doing it along the way, with commits at
incremental checkpoints. I was going to suggest you reduce your checkpoint
interval, to aggregate more frequently, but obviously that doesn't work if
you're not checkpointing at all.
I do think the RecoverableWriter mechanism is designed to make writing
predictable and repeatable in checkpoint/recovery situations (hence the name);
if you're not doing any of that, you may run into some challenges. If you were
to use checkpoints, you would aggregate more frequently, meaning fewer
intermediate blobs with shorter lifetimes, and you'd be less likely to run up
against the 5TB limit, as you would end up with a series of smaller files,
written at each checkpoint, vs. one huge file.
If we do want to consider changes, here's what I suggest:
* Test whether GCP allows composition to overwrite an existing blob that is
also an input to the compose operation. If that works, then we could use that
technique in the composeBlobs function. As mentioned above, I doubt that this
actually reduces the number of blobs that exist temporarily – since blobs are
immutable objects – but it would minimize their lifetime, effectively deleting
them as soon as possible. If overwriting doesn't work, then we could achieve a
similar effect by deleting intermediate blobs as soon as they are not needed
anymore, rather than waiting until the commit succeeds.
* Investigate how other RecoverableWriter implementations handle blob-size
limitations. For example, S3 has the same 5TB limit. As we've discussed, there
would be ways to write multiple files in such scenarios, but we'd have to give
up a bit of atomicity that is assumed in the design. I'm not really sure
whether that would be allowed or if the answer is "commit frequently enough to
keep the size of final blobs under 5TB." Someone else from the Flink team would
have to weigh in on that.
* Consider a change to the composing algorithm that I'll describe below, which
optimizes for total bytes written/composed vs. number of compose operations.
Possible alternate algorithm:
Consider the case where there are 125 raw temporary blobs to be aggregated,
each of size 1GB. The current method (and your proposed method) goes like this:
* Combine the first 32 blobs into 1 blob – 1 compose operation, 32GB written
into a new blob
* Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 32GB
= 63GB written into a new blob
* Combine that blob with the next 31 blobs - 1 compose operation, 31GB + 63GB
= 94GB written into a new blob
* Combine that blob with the next 31 blobs – 1 compose operation, 31GB + 94GB
= 125GB written into a new blob
So, a total of 4 compose operations and 32GB + 63GB + 94GB + 125GB = 314GB
written/composed.
Alternately, we could do this:
* Combine the first 32 blobs into 1 intermediate blob – 1 compose operation,
32GB written into a new blob
* Combine the second 32 blobs into 1 intermediate blob – 1 compose operation
32GB written into a new blob
* Combine the third 32 blob into 1 intermediate blob – 1 compose operation,
32GB written into a new blob
* Combine the last 29 blobs into 1 intermediate blob – 1 compose operation,
29GB written into a new blob
* Combine the 4 intermediate blobs into one final blob – 1 compose operation,
32GB + 32GB + 32GB + 29GB = 125GB written into a new blob
This method has 5 compose operations – one more than the existing way – but
only writes a total of 32GB + 32GB + 32GB + 29GB + 125GB = 250GB. So, more
compose operations but less intermediate data and total data written.
> GSRecoverableWriterCommitter is generating excessive data blobs
> ---------------------------------------------------------------
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Reporter: Simon-Shlomo Poil
> Priority: Major
>
> The `composeBlobs` method in
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to
> merge multiple small blobs into a single large blob using Google Cloud
> Storage's compose method. This process is iterative, combining the result
> from the previous iteration with 31 new blobs until all blobs are merged.
> Upon completion of the composition, the method proceeds to remove the
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption
> during the blob composition process, incurring considerable costs due to
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
> - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
> - After 1st step: 32 blobs are merged into a single blob, increasing total
> storage to 96 GB (64 original + 32 GB new).
> - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs,
> raising the total to 159 GB.
> - After 3rd step: The final blob is merged, culminating in a total of 223 GB
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes
> of data can incur overheads in the petabyte range, leading to unexpectedly
> high costs. Additionally, we have observed an increase in storage exceptions
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to
> immediately delete source blobs once they have been successfully combined.
> This change could significantly reduce data duplication and associated costs.
> However, the implications for data recovery and integrity need careful
> consideration to ensure that this optimization does not compromise the
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively
> combined.
> 3. Note the final amount of data storage used compared to the initial total
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use,
> efficiently managing resources to combine blobs without generating excessive
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data
> storage, leading to high costs and potential system instability due to
> frequent storage exceptions.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)