arjun4084346 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1512185406
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java:
##########
@@ -275,42 +282,55 @@ public void clean() throws IOException {
this.log.info(String.format("Cleaning dataset %s. Using version finder
%s and policy %s", this,
versionFinder.getClass().getName(), selectionPolicy));
-
- List<T> versions =
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
- if (versions.isEmpty()) {
- this.log.warn("No dataset version can be found. Ignoring.");
- continue;
- }
-
- Collections.sort(versions, Collections.reverseOrder());
-
- Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
-
- cleanImpl(deletableVersions);
-
- List<DatasetVersion> allVersions = Lists.newArrayList();
- for (T ver : versions) {
- allVersions.add(ver);
- }
- for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
- try {
- retentionAction.execute(allVersions);
- } catch (Throwable t) {
- atLeastOneFailureSeen = true;
- log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
- this.datasetRoot()), t);
+ // Avoiding OOM by iterating instead of loading all the datasetVersions
in memory
+ RemoteIterator<? extends T> versionRemoteIterator =
versionFinder.findDatasetVersion(this);
+ // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE to
avoid OOM
+ List<T> cleanableVersionsBatch = new ArrayList<>();
+ while (versionRemoteIterator.hasNext()) {
+ T version = versionRemoteIterator.next();
+ cleanableVersionsBatch.add(version);
+ if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+ boolean isCleanSuccess =
+ cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
}
}
+ if (!cleanableVersionsBatch.isEmpty()) {
+ boolean isCleanSuccess = cleanDatasetVersions(cleanableVersionsBatch,
selectionPolicy, versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+ }
}
if (atLeastOneFailureSeen) {
- throw new RuntimeException(String.format(
- "At least one failure happened while processing %s. Look for
previous logs for failures", datasetRoot()));
+ throw new RuntimeException(
+ String.format("At least one failure happened while processing %s.
Look for previous logs for failures",
+ datasetRoot()));
}
}
- protected void cleanImpl(Collection<T> deletableVersions) throws IOException
{
+ private boolean cleanDatasetVersions(List<T> versions,
VersionSelectionPolicy<T> selectionPolicy,
+ VersionFinderAndPolicy<T> versionFinderAndPolicy)
+ throws IOException {
+ boolean isCleanSuccess = true;
+ Collections.sort(versions, Collections.reverseOrder());
+ Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
Review Comment:
Sorting a subset of datasets and then calling `listSelectedVersions` on it
should be okay in most of the selection policies. But for
NewestKSelectionPolicy, we need to supply it with complete list of sorted
datasets to find absolute K newest datasets.
Second problem with using this logic with NewestKSelectionPolicy is that it
may delete K datasets multiple times, effectively cleaning m x K datasets.
What is your thought on tackling this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]