arpit09 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1512388680


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java:
##########
@@ -275,42 +282,55 @@ public void clean() throws IOException {
 
       this.log.info(String.format("Cleaning dataset %s. Using version finder 
%s and policy %s", this,
           versionFinder.getClass().getName(), selectionPolicy));
-
-      List<T> versions = 
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
-      if (versions.isEmpty()) {
-        this.log.warn("No dataset version can be found. Ignoring.");
-        continue;
-      }
-
-      Collections.sort(versions, Collections.reverseOrder());
-
-      Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);
-
-      cleanImpl(deletableVersions);
-
-      List<DatasetVersion> allVersions = Lists.newArrayList();
-      for (T ver : versions) {
-        allVersions.add(ver);
-      }
-      for (RetentionAction retentionAction : 
versionFinderAndPolicy.getRetentionActions()) {
-        try {
-          retentionAction.execute(allVersions);
-        } catch (Throwable t) {
-          atLeastOneFailureSeen = true;
-          log.error(String.format("RetentionAction %s failed for dataset %s", 
retentionAction.getClass().getName(),
-                  this.datasetRoot()), t);
+      // Avoiding OOM by iterating instead of loading all the datasetVersions 
in memory
+      RemoteIterator<? extends T> versionRemoteIterator = 
versionFinder.findDatasetVersion(this);
+      // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE to 
avoid OOM
+      List<T> cleanableVersionsBatch = new ArrayList<>();
+      while (versionRemoteIterator.hasNext()) {
+        T version = versionRemoteIterator.next();
+        cleanableVersionsBatch.add(version);
+        if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+          boolean isCleanSuccess =
+              cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy, 
versionFinderAndPolicy);
+          atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
         }
       }
+      if (!cleanableVersionsBatch.isEmpty()) {
+        boolean isCleanSuccess = cleanDatasetVersions(cleanableVersionsBatch, 
selectionPolicy, versionFinderAndPolicy);
+        atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+      }
     }
 
     if (atLeastOneFailureSeen) {
-      throw new RuntimeException(String.format(
-          "At least one failure happened while processing %s. Look for 
previous logs for failures", datasetRoot()));
+      throw new RuntimeException(
+          String.format("At least one failure happened while processing %s. 
Look for previous logs for failures",
+              datasetRoot()));
     }
   }
 
-  protected void cleanImpl(Collection<T> deletableVersions) throws IOException 
{
+  private boolean cleanDatasetVersions(List<T> versions, 
VersionSelectionPolicy<T> selectionPolicy,
+      VersionFinderAndPolicy<T> versionFinderAndPolicy)
+      throws IOException {
+    boolean isCleanSuccess = true;
+    Collections.sort(versions, Collections.reverseOrder());
+    Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);

Review Comment:
   For OOM, The selection policy excluded 2 items that was in combination with 
SelectTimeBeforePolicy (time < 3 days old).
   For the Iterator-based approach, NewestKselectionpolicy doesn't add any 
value as it needs all the data in memory which we want to avoid in our case. We 
can add a config, allowing the client to go via an iterator-based approach or 
list-based. If the iterator is chosen with NewestKSelectionPolicy - we can 
throw a runtime error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to