arjun4084346 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1512185917


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java:
##########
@@ -275,42 +282,55 @@ public void clean() throws IOException {
 
       this.log.info(String.format("Cleaning dataset %s. Using version finder 
%s and policy %s", this,
           versionFinder.getClass().getName(), selectionPolicy));
-
-      List<T> versions = 
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
-      if (versions.isEmpty()) {
-        this.log.warn("No dataset version can be found. Ignoring.");
-        continue;
-      }
-
-      Collections.sort(versions, Collections.reverseOrder());
-
-      Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);
-
-      cleanImpl(deletableVersions);
-
-      List<DatasetVersion> allVersions = Lists.newArrayList();
-      for (T ver : versions) {
-        allVersions.add(ver);
-      }
-      for (RetentionAction retentionAction : 
versionFinderAndPolicy.getRetentionActions()) {
-        try {
-          retentionAction.execute(allVersions);
-        } catch (Throwable t) {
-          atLeastOneFailureSeen = true;
-          log.error(String.format("RetentionAction %s failed for dataset %s", 
retentionAction.getClass().getName(),
-                  this.datasetRoot()), t);
+      // Avoiding OOM by iterating instead of loading all the datasetVersions 
in memory
+      RemoteIterator<? extends T> versionRemoteIterator = 
versionFinder.findDatasetVersion(this);
+      // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE to 
avoid OOM
+      List<T> cleanableVersionsBatch = new ArrayList<>();
+      while (versionRemoteIterator.hasNext()) {
+        T version = versionRemoteIterator.next();
+        cleanableVersionsBatch.add(version);
+        if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+          boolean isCleanSuccess =
+              cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy, 
versionFinderAndPolicy);
+          atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
         }
       }
+      if (!cleanableVersionsBatch.isEmpty()) {
+        boolean isCleanSuccess = cleanDatasetVersions(cleanableVersionsBatch, 
selectionPolicy, versionFinderAndPolicy);
+        atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+      }
     }
 
     if (atLeastOneFailureSeen) {
-      throw new RuntimeException(String.format(
-          "At least one failure happened while processing %s. Look for 
previous logs for failures", datasetRoot()));
+      throw new RuntimeException(
+          String.format("At least one failure happened while processing %s. 
Look for previous logs for failures",
+              datasetRoot()));
     }
   }
 
-  protected void cleanImpl(Collection<T> deletableVersions) throws IOException 
{
+  private boolean cleanDatasetVersions(List<T> versions, 
VersionSelectionPolicy<T> selectionPolicy,
+      VersionFinderAndPolicy<T> versionFinderAndPolicy)
+      throws IOException {
+    boolean isCleanSuccess = true;
+    Collections.sort(versions, Collections.reverseOrder());
+    Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);

Review Comment:
   What was the selection policy where this issue of OOM was detected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to