arjun4084346 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1502673092
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java:
##########
@@ -275,39 +278,48 @@ public void clean() throws IOException {
this.log.info(String.format("Cleaning dataset %s. Using version finder
%s and policy %s", this,
versionFinder.getClass().getName(), selectionPolicy));
-
- List<T> versions =
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
- if (versions.isEmpty()) {
- this.log.warn("No dataset version can be found. Ignoring.");
- continue;
- }
-
- Collections.sort(versions, Collections.reverseOrder());
-
- Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
-
- cleanImpl(deletableVersions);
-
- List<DatasetVersion> allVersions = Lists.newArrayList();
- for (T ver : versions) {
- allVersions.add(ver);
- }
- for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
- try {
- retentionAction.execute(allVersions);
- } catch (Throwable t) {
- atLeastOneFailureSeen = true;
- log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
- this.datasetRoot()), t);
+ // Avoiding OOM by iterating instead of loading all the datasetVersions
in memory
+ RemoteIterator<? extends T> versionRemoteIterator =
versionFinder.findDatasetVersion(this);
+ // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE to
avoid OOM
+ List<T> cleanableVersionsBatch = new ArrayList<>();
+ while (versionRemoteIterator.hasNext()) {
+ T version = versionRemoteIterator.next();
+ cleanableVersionsBatch.add(version);
+ if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+ boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
}
}
+ if (!cleanableVersionsBatch.isEmpty()) {
+ boolean isCleanSuccess = cleanDatasetVersions(cleanableVersionsBatch,
selectionPolicy, versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+ }
}
if (atLeastOneFailureSeen) {
- throw new RuntimeException(String.format(
- "At least one failure happened while processing %s. Look for
previous logs for failures", datasetRoot()));
+ throw new RuntimeException(
+ String.format("At least one failure happened while processing %s.
Look for previous logs for failures",
+ datasetRoot()));
+ }
+ }
+
+ private boolean cleanDatasetVersions(List<T> versions,
VersionSelectionPolicy<T> selectionPolicy,
+ VersionFinderAndPolicy<T> versionFinderAndPolicy) throws IOException {
+ Collections.sort(versions, Collections.reverseOrder());
+ Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
+ cleanImpl(deletableVersions);
+ List<DatasetVersion> allVersions = Lists.newArrayList(versions);
+ for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
+ try {
+ retentionAction.execute(allVersions);
+ } catch (Throwable t) {
+ log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
+ this.datasetRoot()), t);
+ return false;
Review Comment:
We still need to try other retention actions on this dataset and also remove
this version from `versions` before returning, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]