This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ef88a58cc [GOBBLIN-2006] Adding an iterator for fetching dataset 
versions  (#3883)
ef88a58cc is described below

commit ef88a58cc8663947ace68cd84694ace2359ccf12
Author: Arpit Varshney <[email protected]>
AuthorDate: Wed Mar 13 12:06:23 2024 +0530

    [GOBBLIN-2006] Adding an iterator for fetching dataset versions  (#3883)
    
    * Add iterator to fetch data version
    * Address review comments
    * Move Iterator to config driven
    ---------
    Co-authored-by: Arpit Varshney <[email protected]>
---
 .../management/policy/NewestKSelectionPolicy.java  |  18 ++-
 .../dataset/MultiVersionCleanableDatasetBase.java  | 150 +++++++++++++--------
 .../finder/GlobModTimeDatasetVersionFinder.java    |  17 ++-
 .../finder/AbstractDatasetVersionFinder.java       | 128 ++++++++++++++++--
 .../finder/GlobModTimeDatasetVersionFinder.java    |  22 ++-
 .../management/version/finder/VersionFinder.java   |  27 +++-
 .../policy/NewestKSelectionPolicyTest.java         | 136 +++++++++----------
 .../retention/CleanableDatasetBaseTest.java        |  78 +++++++----
 .../integration/RetentionIntegrationTest.java      |   4 +-
 .../finder/AbstractDatasetVersionFinderTest.java   |  83 ++++++++++++
 .../retention.conf                                 |  10 ++
 .../selection.conf                                 |  18 +++
 .../setup_validate.conf                            |  22 +++
 13 files changed, 539 insertions(+), 174 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
index 5d25a6b1b..cf79ecb18 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
@@ -33,10 +33,12 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Select the newest k versions of the dataset.
+ * Newest k version policy can't be used while fetching version via Iterator
  */
 @ToString
 public class NewestKSelectionPolicy<T extends DatasetVersion> implements 
VersionSelectionPolicy<T> {
@@ -61,6 +63,14 @@ public class NewestKSelectionPolicy<T extends 
DatasetVersion> implements Version
    */
   public static final String NEWEST_K_VERSIONS_NOTSELECTED_KEY = 
"selection.newestK.versionsNotSelected";
 
+  /**
+   * This denotes to use iterator for fetching all the versions of dataset.
+   * This should be set true when the dataset versions has to be pulled in 
memory iteratively
+   * If not set, may result into OOM as all the dataset versions are pulled 
in-memory
+   * If true, this can't be used along with NewestKSelectionPolicy as 
NewestKSelectionPolicy requires all the datasets info
+   */
+  public static final String SHOULD_ITERATE_VERSIONS = 
"version.should.iterate";
+
   public static final Integer VERSIONS_SELECTED_DEFAULT = 2;
 
   public static final Integer MAX_VERSIONS_ALLOWED = 1000000;
@@ -79,10 +89,14 @@ public class NewestKSelectionPolicy<T extends 
DatasetVersion> implements Version
     }
 
     static Params createFromConfig(Config config) {
+      if (ConfigUtils.getBoolean(config, SHOULD_ITERATE_VERSIONS, false)) {
+        throw new RuntimeException("NewestKSelection policy can't be used with 
an iterator for finding versions");
+      }
       if (config.hasPath(NEWEST_K_VERSIONS_SELECTED_KEY)) {
         if (config.hasPath(NEWEST_K_VERSIONS_NOTSELECTED_KEY)) {
-          throw new RuntimeException("Only one of " + 
NEWEST_K_VERSIONS_SELECTED_KEY + " and "
-              + NEWEST_K_VERSIONS_NOTSELECTED_KEY + " can be specified.");
+          throw new RuntimeException(
+              "Only one of " + NEWEST_K_VERSIONS_SELECTED_KEY + " and " + 
NEWEST_K_VERSIONS_NOTSELECTED_KEY
+                  + " can be specified.");
         }
         return new Params(config.getInt(NEWEST_K_VERSIONS_SELECTED_KEY), 
false);
       } else if (config.hasPath(NEWEST_K_VERSIONS_NOTSELECTED_KEY)) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 862082230..5c6f9710f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -18,17 +18,14 @@
 package org.apache.gobblin.data.management.retention.dataset;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Singular;
-
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +34,11 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+
 import 
org.apache.gobblin.data.management.policy.EmbeddedRetentionSelectionPolicy;
 import org.apache.gobblin.data.management.policy.SelectNothingPolicy;
 import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
@@ -105,8 +107,7 @@ import org.apache.gobblin.util.ConfigUtils;
  *
  * @param <T> type of {@link FileSystemDatasetVersion} supported by this 
{@link CleanableDataset}.
  */
-public abstract class MultiVersionCleanableDatasetBase<T extends 
FileSystemDatasetVersion>
-    implements CleanableDataset, FileSystemDataset {
+public abstract class MultiVersionCleanableDatasetBase<T extends 
FileSystemDatasetVersion> implements CleanableDataset, FileSystemDataset {
 
   /**
    * @deprecated in favor of {@link FsCleanableHelper}
@@ -114,6 +115,8 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
   @Deprecated
   public static final String CONFIGURATION_KEY_PREFIX = 
FsCleanableHelper.CONFIGURATION_KEY_PREFIX;
 
+  public static final Integer CLEANABLE_DATASET_BATCH_SIZE = 100;
+
   /**
    * @deprecated in favor of {@link FsCleanableHelper}
    */
@@ -181,6 +184,7 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
    */
   @Deprecated
   protected final boolean deleteAsOwner;
+
   /**
    * Get {@link 
org.apache.gobblin.data.management.retention.policy.RetentionPolicy} to use.
    */
@@ -192,15 +196,17 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
         Boolean.valueOf(props.getProperty(SKIP_TRASH_KEY, SKIP_TRASH_DEFAULT)),
         Boolean.valueOf(props.getProperty(DELETE_EMPTY_DIRECTORIES_KEY, 
DELETE_EMPTY_DIRECTORIES_DEFAULT)),
         Boolean.valueOf(props.getProperty(DELETE_AS_OWNER_KEY, 
DELETE_AS_OWNER_DEFAULT)),
-        ConfigUtils.getBoolean(config, IS_DATASET_BLACKLISTED_KEY, 
Boolean.valueOf(IS_DATASET_BLACKLISTED_DEFAULT)), log);
+        ConfigUtils.getBoolean(config, IS_DATASET_BLACKLISTED_KEY, 
Boolean.valueOf(IS_DATASET_BLACKLISTED_DEFAULT)),
+        log);
   }
 
-  public MultiVersionCleanableDatasetBase(final FileSystem fs, final 
Properties props, Logger log) throws IOException {
+  public MultiVersionCleanableDatasetBase(final FileSystem fs, final 
Properties props, Logger log)
+      throws IOException {
     // This constructor is used by retention jobs configured through job 
configs and do not use dataset configs from config store.
     // IS_DATASET_BLACKLISTED_KEY is only available with dataset config. Hence 
set IS_DATASET_BLACKLISTED_KEY to default
     // ...false for jobs running with job configs
-    this(fs, props, ConfigFactory.parseMap(ImmutableMap.<String, String> 
of(IS_DATASET_BLACKLISTED_KEY,
-        IS_DATASET_BLACKLISTED_DEFAULT)), log);
+    this(fs, props, ConfigFactory.parseMap(
+        ImmutableMap.<String, String>of(IS_DATASET_BLACKLISTED_KEY, 
IS_DATASET_BLACKLISTED_DEFAULT)), log);
   }
 
   /**
@@ -220,7 +226,8 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
       boolean deleteEmptyDirectories, boolean deleteAsOwner, boolean 
isDatasetBlacklisted, Logger log)
       throws IOException {
     this.log = log;
-    this.fsCleanableHelper = new FsCleanableHelper(fs, properties, simulate, 
skipTrash, deleteEmptyDirectories, deleteAsOwner, log);
+    this.fsCleanableHelper =
+        new FsCleanableHelper(fs, properties, simulate, skipTrash, 
deleteEmptyDirectories, deleteAsOwner, log);
     this.fs = fs;
     this.simulate = simulate;
     this.skipTrash = skipTrash;
@@ -228,11 +235,11 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
     this.trash = this.fsCleanableHelper.getTrash();
     this.deleteAsOwner = deleteAsOwner;
     this.isDatasetBlacklisted = isDatasetBlacklisted;
-
   }
 
   public MultiVersionCleanableDatasetBase(FileSystem fs, Properties 
properties, boolean simulate, boolean skipTrash,
-      boolean deleteEmptyDirectories, boolean deleteAsOwner, Logger log) 
throws IOException {
+      boolean deleteEmptyDirectories, boolean deleteAsOwner, Logger log)
+      throws IOException {
     this(fs, properties, simulate, skipTrash, deleteEmptyDirectories, 
deleteAsOwner,
         Boolean.parseBoolean(IS_DATASET_BLACKLISTED_DEFAULT), log);
   }
@@ -241,21 +248,22 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
    * Method to perform the Retention operations for this dataset.
    *
    *<ul>
-  * <li>{@link 
MultiVersionCleanableDatasetBase#getVersionFindersAndPolicies()} gets a list 
{@link VersionFinderAndPolicy}s
-  * <li>Each {@link VersionFinderAndPolicy} contains a {@link VersionFinder} 
and a {@link VersionSelectionPolicy}. It can
-  * optionally have a {@link RetentionAction}
-  * <li>The {@link MultiVersionCleanableDatasetBase#clean()} method finds all 
the {@link FileSystemDatasetVersion}s using
-  * {@link VersionFinderAndPolicy#versionFinder}
-  * <li> It gets the deletable {@link FileSystemDatasetVersion}s by applying 
{@link VersionFinderAndPolicy#versionSelectionPolicy}.
-  * These deletable version are deleted  and then deletes empty parent 
directories.
-  * <li>If additional retention actions are available at {@link 
VersionFinderAndPolicy#getRetentionActions()}, all versions
-  * found by the {@link VersionFinderAndPolicy#versionFinder} are passed to 
{@link RetentionAction#execute(List)} for
-  * each {@link RetentionAction}
+   * <li>{@link 
MultiVersionCleanableDatasetBase#getVersionFindersAndPolicies()} gets a list 
{@link VersionFinderAndPolicy}s
+   * <li>Each {@link VersionFinderAndPolicy} contains a {@link VersionFinder} 
and a {@link VersionSelectionPolicy}. It can
+   * optionally have a {@link RetentionAction}
+   * <li>The {@link MultiVersionCleanableDatasetBase#clean()} method finds all 
the {@link FileSystemDatasetVersion}s using
+   * {@link VersionFinderAndPolicy#versionFinder}
+   * <li> It gets the deletable {@link FileSystemDatasetVersion}s by applying 
{@link VersionFinderAndPolicy#versionSelectionPolicy}.
+   * These deletable version are deleted  and then deletes empty parent 
directories.
+   * <li>If additional retention actions are available at {@link 
VersionFinderAndPolicy#getRetentionActions()}, all versions
+   * found by the {@link VersionFinderAndPolicy#versionFinder} are passed to 
{@link RetentionAction#execute(List)} for
+   * each {@link RetentionAction}
    * </ul>
    *
    */
   @Override
-  public void clean() throws IOException {
+  public void clean()
+      throws IOException {
 
     if (this.isDatasetBlacklisted) {
       this.log.info("Dataset blacklisted. Cleanup skipped for " + 
datasetRoot());
@@ -265,7 +273,6 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
     boolean atLeastOneFailureSeen = false;
 
     for (VersionFinderAndPolicy<T> versionFinderAndPolicy : 
getVersionFindersAndPolicies()) {
-
       VersionSelectionPolicy<T> selectionPolicy = 
versionFinderAndPolicy.getVersionSelectionPolicy();
       VersionFinder<? extends T> versionFinder = 
versionFinderAndPolicy.getVersionFinder();
 
@@ -275,42 +282,68 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
 
       this.log.info(String.format("Cleaning dataset %s. Using version finder 
%s and policy %s", this,
           versionFinder.getClass().getName(), selectionPolicy));
+      if (versionFinder.useIteratorForFindingVersions()) {
+        // Avoiding OOM by iterating instead of loading all the 
datasetVersions in memory
+        RemoteIterator<? extends T> versionRemoteIterator = 
versionFinder.findDatasetVersion(this);
+        // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE 
to avoid OOM
+        List<T> cleanableVersionsBatch = new ArrayList<>();
+        while (versionRemoteIterator.hasNext()) {
+          T version = versionRemoteIterator.next();
+          cleanableVersionsBatch.add(version);
+          if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+            boolean isCleanSuccess = 
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
+                versionFinderAndPolicy.getRetentionActions());
+            atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+          }
+        }
+        if (!cleanableVersionsBatch.isEmpty()) {
+          boolean isCleanSuccess = 
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
+              versionFinderAndPolicy.getRetentionActions());
+          atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+        }
+      } else {
+        List<T> versions = 
Lists.newArrayList(versionFinder.findDatasetVersions(this));
 
-      List<T> versions = 
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
-      if (versions.isEmpty()) {
-        this.log.warn("No dataset version can be found. Ignoring.");
-        continue;
-      }
-
-      Collections.sort(versions, Collections.reverseOrder());
-
-      Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);
-
-      cleanImpl(deletableVersions);
-
-      List<DatasetVersion> allVersions = Lists.newArrayList();
-      for (T ver : versions) {
-        allVersions.add(ver);
-      }
-      for (RetentionAction retentionAction : 
versionFinderAndPolicy.getRetentionActions()) {
-        try {
-          retentionAction.execute(allVersions);
-        } catch (Throwable t) {
-          atLeastOneFailureSeen = true;
-          log.error(String.format("RetentionAction %s failed for dataset %s", 
retentionAction.getClass().getName(),
-                  this.datasetRoot()), t);
+        if (versions.isEmpty()) {
+          this.log.warn("No dataset version can be found. Ignoring.");
+          continue;
         }
+        boolean isCleanSuccess =
+            cleanDatasetVersions(versions, selectionPolicy, 
versionFinderAndPolicy.getRetentionActions());
+        atLeastOneFailureSeen = !isCleanSuccess || atLeastOneFailureSeen;
       }
     }
 
     if (atLeastOneFailureSeen) {
-      throw new RuntimeException(String.format(
-          "At least one failure happened while processing %s. Look for 
previous logs for failures", datasetRoot()));
+      throw new RuntimeException(
+          String.format("At least one failure happened while processing %s. 
Look for previous logs for failures",
+              datasetRoot()));
+    }
+  }
+
+  private boolean cleanDatasetVersions(List<T> versions, 
VersionSelectionPolicy<T> selectionPolicy,
+      List<RetentionAction> retentionActions)
+      throws IOException {
+    boolean isCleanSuccess = true;
+    Collections.sort(versions, Collections.reverseOrder());
+    Collection<T> deletableVersions = 
selectionPolicy.listSelectedVersions(versions);
+    cleanImpl(deletableVersions);
+    List<DatasetVersion> allVersions = Lists.newArrayList(versions);
+    for (RetentionAction retentionAction : retentionActions) {
+      try {
+        retentionAction.execute(allVersions);
+      } catch (Throwable t) {
+        log.error(String.format("RetentionAction %s failed for dataset %s", 
retentionAction.getClass().getName(),
+            this.datasetRoot()), t);
+        isCleanSuccess = false;
+      }
     }
+    versions.clear();
+    return isCleanSuccess;
   }
 
-  protected void cleanImpl(Collection<T> deletableVersions) throws IOException 
{
+  protected void cleanImpl(Collection<T> deletableVersions)
+      throws IOException {
     this.fsCleanableHelper.clean(deletableVersions, this);
   }
 
@@ -343,13 +376,16 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
      * @deprecated use {@link VersionFinderAndPolicyBuilder}
      */
     @Deprecated
-    public VersionFinderAndPolicy(VersionSelectionPolicy<T> 
versionSelectionPolicy, VersionFinder<? extends T> versionFinder, Config 
config) {
+    public VersionFinderAndPolicy(VersionSelectionPolicy<T> 
versionSelectionPolicy,
+        VersionFinder<? extends T> versionFinder, Config config) {
       this.versionSelectionPolicy = versionSelectionPolicy;
       this.versionFinder = versionFinder;
       this.retentionActions = Lists.newArrayList();
       this.config = config;
     }
-    public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, 
VersionFinder<? extends T> versionFinder, Config config) {
+
+    public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy, 
VersionFinder<? extends T> versionFinder,
+        Config config) {
       this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy), 
versionFinder, config);
     }
 
@@ -371,8 +407,8 @@ public abstract class MultiVersionCleanableDatasetBase<T 
extends FileSystemDatas
         } else {
           localRetentionActions = Lists.newArrayList(this.retentionActions);
         }
-        return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy, 
this.versionFinder,
-            localRetentionActions, this.config);
+        return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy, 
this.versionFinder, localRetentionActions,
+            this.config);
       }
     }
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
index 1b8182429..cdcdf481c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
@@ -23,6 +23,7 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.data.management.retention.version.DatasetVersion;
 import 
org.apache.gobblin.data.management.retention.version.TimestampedDatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -35,14 +36,24 @@ public class GlobModTimeDatasetVersionFinder extends 
DatasetVersionFinder<Timest
   private final 
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder
 realVersionFinder;
   private static final String VERSION_FINDER_GLOB_PATTERN_KEY = 
"gobblin.retention.version.finder.pattern";
 
+  /**
+   * This denotes to use iterator for fetching all the versions of dataset.
+   * This should be set true when the dataset versions has to be pulled in 
memory iteratively
+   * If not set, may result into OOM as all the dataset versions are pulled 
in-memory
+   */
+  private static final String SHOULD_ITERATE_VERSIONS = 
"version.should.iterate";
+
   public GlobModTimeDatasetVersionFinder(FileSystem fs, Config config) {
-    this(fs, config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new 
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY)) : new Path("*"));
+    this(fs,
+        config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new 
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY))
+            : new Path("*"), ConfigUtils.getBoolean(config, 
SHOULD_ITERATE_VERSIONS, false));
   }
 
-  public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern) {
+  public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern, 
boolean useIterator) {
     super(fs);
     this.realVersionFinder =
-        new 
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder(fs,
 globPattern);
+        new 
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder(fs,
 globPattern,
+            useIterator);
   }
 
   @Override
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
index a3b36411f..8991b2dda 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
@@ -17,22 +17,29 @@
 
 package org.apache.gobblin.data.management.version.finder;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.Stack;
+import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
+import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
 import org.apache.gobblin.util.PathUtils;
 
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
 
 /**
  * Class to find {@link FileSystemDataset} versions in the file system.
@@ -68,7 +75,8 @@ public abstract class AbstractDatasetVersionFinder<T extends 
FileSystemDatasetVe
    * @throws IOException
    */
   @Override
-  public Collection<T> findDatasetVersions(Dataset dataset) throws IOException 
{
+  public Collection<T> findDatasetVersions(Dataset dataset)
+      throws IOException {
     FileSystemDataset fsDataset = (FileSystemDataset) dataset;
     Path versionGlobStatus = new Path(fsDataset.datasetRoot(), 
globVersionPattern());
     FileStatus[] dataSetVersionPaths = this.fs.globStatus(versionGlobStatus);
@@ -86,6 +94,109 @@ public abstract class AbstractDatasetVersionFinder<T 
extends FileSystemDatasetVe
     return dataSetVersions;
   }
 
+  /**
+   * Find dataset version in the input {@link org.apache.gobblin.dataset}. 
Dataset versions are subdirectories of the
+   * input {@link org.apache.gobblin.dataset} representing a single manageable 
unit in the dataset.
+   *
+   * @param dataset {@link org.apache.gobblin.dataset} to directory containing 
all versions of a dataset
+   * @return - Returns an iterator for fetching each dataset version found.
+   * @throws IOException
+   */
+  @Override
+  public RemoteIterator<T> findDatasetVersion(Dataset dataset)
+      throws IOException {
+    FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+    Path versionGlobStatus = new Path(fsDataset.datasetRoot(), 
globVersionPattern());
+
+    return getDatasetVersionIterator(fsDataset.datasetRoot(), 
getRegexPattern(versionGlobStatus.toString()));
+  }
+
+  /**
+   * Returns an iterator to fetch the dataset versions for the datasets whose 
path {@link org.apache.hadoop.fs.Path}
+   * starts with the root and matches the globPattern passed
+   *
+   * @param root - Path of the root from which the Dataset Versions have to be 
returned
+   * @param pathPattern - Pattern to match the dataset version path
+   * @return - an iterator of matched data versions
+   * @throws IOException
+   */
+  public RemoteIterator<T> getDatasetVersionIterator(Path root, String 
pathPattern)
+      throws IOException {
+    Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+    RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+    iteratorStack.push(fsIterator);
+    return new RemoteIterator<T>() {
+      Pair<FileStatus, Boolean> nextFileStatus = new MutablePair<>();
+
+      @Override
+      public boolean hasNext()
+          throws IOException {
+        if (iteratorStack.isEmpty()) {
+          return false;
+        }
+        // No need to process if the next() has not been called
+        if (nextFileStatus.getKey() != null && !nextFileStatus.getValue()) {
+          return true;
+        }
+        nextFileStatus = new MutablePair<>(fetchNextFileStatus(iteratorStack, 
pathPattern), false);
+        return nextFileStatus.getKey() != null;
+      }
+
+      @Override
+      public T next()
+          throws IOException {
+        if (nextFileStatus.getKey() == null || nextFileStatus.getValue()) {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+        }
+        T datasetVersion = 
getDatasetVersion(PathUtils.relativizePath(nextFileStatus.getKey().getPath(), 
root),
+            nextFileStatus.getKey());
+        nextFileStatus.setValue(true);
+        return datasetVersion;
+      }
+    };
+  }
+
+  /**
+   * Helper method to find the next filestatus matching the globPattern.
+   * This uses a stack to keep track of the fileStatusIterator returned at 
each subpaths
+   *
+   * @param iteratorStack
+   * @param globPattern
+   * @return
+   * @throws IOException
+   */
+  private FileStatus fetchNextFileStatus(Stack<RemoteIterator<FileStatus>> 
iteratorStack, String globPattern)
+      throws IOException {
+    while (!iteratorStack.isEmpty()) {
+      RemoteIterator<FileStatus> latestfsIterator = iteratorStack.pop();
+      while (latestfsIterator.hasNext()) {
+        FileStatus fileStatus = latestfsIterator.next();
+        if (fileStatus.isDirectory()) {
+          iteratorStack.push(fs.listStatusIterator(fileStatus.getPath()));
+        }
+        if (Pattern.matches(globPattern, 
fileStatus.getPath().toUri().getPath())) {
+          if (latestfsIterator.hasNext()) {
+            // Pushing back the current file status iterator before returning 
as there are more files to be processed
+            iteratorStack.push(latestfsIterator);
+          }
+          return fileStatus;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Converting a globPatter to a regex pattern to match the file status path
+   *
+   * @return
+   */
+  private static String getRegexPattern(String globPattern) {
+    return GlobPattern.compile(globPattern).pattern().replaceAll("\\.\\*", 
"[^/]*");
+  }
+
   /**
    * Should return class of T.
    */
@@ -107,5 +218,4 @@ public abstract class AbstractDatasetVersionFinder<T 
extends FileSystemDatasetVe
    * @return {@link org.apache.gobblin.data.management.version.DatasetVersion} 
for that {@link FileStatus}.
    */
   public abstract T getDatasetVersion(Path pathRelativeToDatasetRoot, 
FileStatus versionFileStatus);
-
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
index c71eb2d5c..ba863ee94 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
 import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -37,21 +38,31 @@ import 
org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
 public class GlobModTimeDatasetVersionFinder extends 
DatasetVersionFinder<TimestampedDatasetVersion> {
 
   private final Path globPattern;
+  private boolean useIterator;
 
   private static final String VERSION_FINDER_GLOB_PATTERN_KEY = 
"version.globPattern";
 
+  /**
+   * This denotes to use iterator for fetching all the versions of dataset.
+   * This should be set true when the dataset versions has to be pulled in 
memory iteratively
+   * If not set, may result into OOM as all the dataset versions are pulled 
in-memory
+   */
+  private static final String SHOULD_ITERATE_VERSIONS = 
"version.should.iterate";
+
   public GlobModTimeDatasetVersionFinder(FileSystem fs, Config config) {
-    this(fs, config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY)
-        ? new Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY)) : new 
Path("*"));
+    this(fs,
+        config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new 
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY))
+            : new Path("*"), ConfigUtils.getBoolean(config, 
SHOULD_ITERATE_VERSIONS, false));
   }
 
   public GlobModTimeDatasetVersionFinder(FileSystem fs, Properties props) {
     this(fs, ConfigFactory.parseProperties(props));
   }
 
-  public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern) {
+  public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern, 
boolean useIterator) {
     super(fs);
     this.globPattern = globPattern;
+    this.useIterator = useIterator;
   }
 
   @Override
@@ -64,6 +75,11 @@ public class GlobModTimeDatasetVersionFinder extends 
DatasetVersionFinder<Timest
     return this.globPattern;
   }
 
+  @Override
+  public boolean useIteratorForFindingVersions(){
+    return this.useIterator;
+  }
+
   @Override
   public TimestampedDatasetVersion getDatasetVersion(Path 
pathRelativeToDatasetRoot, Path fullPath) {
     try {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
index 7b8b14f12..619fba509 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.data.management.version.DatasetVersion;
 
+import org.apache.hadoop.fs.RemoteIterator;
+
 
 /**
  * Finds dataset versions.
@@ -43,5 +45,28 @@ public interface VersionFinder<T extends DatasetVersion> {
    * @return Collection of {@link DatasetVersion} for each dataset version 
found.
    * @throws IOException
    */
-  public Collection<T> findDatasetVersions(Dataset dataset) throws IOException;
+  public Collection<T> findDatasetVersions(Dataset dataset)
+      throws IOException;
+
+  /**
+   * Find dataset versions for {@link Dataset}. Each dataset versions 
represents a single manageable unit in the dataset.
+   *
+   * @param dataset
+   * @return - an iterator
+   * @throws IOException
+   */
+  public default RemoteIterator<T> findDatasetVersion(Dataset dataset)
+      throws IOException {
+    return (RemoteIterator<T>) 
findDatasetVersions(dataset).stream().iterator();
+  }
+
+  /**
+   * Returns a boolean to identify whether to use iteratorVersion or 
Collections
+   * for finding dataset versions
+   *
+   * @return
+   */
+  public default boolean useIteratorForFindingVersions() {
+    return false;
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
index d756ddc61..7f2670714 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
@@ -31,57 +31,40 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.data.management.version.DatasetVersion;
 
+
 /** Unit tests for {@link NewestKSelectionPolicy} */
 public class NewestKSelectionPolicyTest {
 
   private static final Map<String, Map<String, Integer>> TEST_CONFIGS =
-      ImmutableMap.<String, Map<String, Integer>>builder()
-      .put("empty", ImmutableMap.<String, Integer>builder().build())
-      .put("selectedPos", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
-                          .build())
-      .put("notSelectedPos", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 10)
-                          .build())
-      .build();
+      ImmutableMap.<String, Map<String, Integer>>builder().put("empty", 
ImmutableMap.<String, Integer>builder().build())
+          .put("selectedPos",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
+                  .build()).put("notSelectedPos",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
10)
+                  .build()).build();
 
   private static final Map<String, Map<String, Integer>> NEG_TEST_CONFIGS =
-      ImmutableMap.<String, Map<String, Integer>>builder()
-      .put("bothProps", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 5)
-                          .build())
-      .put("selectedNeg", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, -5)
-                          .build())
-      .put("notSelectedNeg", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, -1)
-                          .build())
-      .put("selectedBig", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
-                               NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
-                          .build())
-      .put("notSelectedBig", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
-                              NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
-                          .build())
-      .put("selected0", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 0)
-                          .build())
-      .put("notSelected0", ImmutableMap.<String, Integer>builder()
-                          
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 0)
-                          .build())
-      .build();
+      ImmutableMap.<String, Map<String, Integer>>builder().put("bothProps",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
+                  
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
5).build()).put("selectedNeg",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, -5)
+                  .build()).put("notSelectedNeg",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
-1)
+                  .build()).put("selectedBig", ImmutableMap.<String, 
Integer>builder()
+              .put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 
NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
+              .build()).put("notSelectedBig", ImmutableMap.<String, 
Integer>builder()
+              .put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
+                  NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 
1).build()).put("selected0",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 
0).build())
+          .put("notSelected0",
+              ImmutableMap.<String, 
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
0)
+                  .build()).build();
 
   private static final Map<String, Integer> TEST_RESULTS =
-      ImmutableMap.<String, Integer>builder()
-      .put("empty", NewestKSelectionPolicy.VERSIONS_SELECTED_DEFAULT)
-      .put("selectedPos", 5)
-      .put("notSelectedPos", -10)
-      .build();
-
-  public static class TestStringDatasetVersion implements DatasetVersion,
-                                                      
Comparable<DatasetVersion> {
+      ImmutableMap.<String, Integer>builder().put("empty", 
NewestKSelectionPolicy.VERSIONS_SELECTED_DEFAULT)
+          .put("selectedPos", 5).put("notSelectedPos", -10).build();
+
+  public static class TestStringDatasetVersion implements DatasetVersion, 
Comparable<DatasetVersion> {
     private String _version;
 
     public TestStringDatasetVersion(String version) {
@@ -93,69 +76,77 @@ public class NewestKSelectionPolicyTest {
       if (!(o instanceof TestStringDatasetVersion)) {
         throw new RuntimeException("Incompatible version: " + o);
       }
-      return _version.compareTo(((TestStringDatasetVersion)o)._version);
+      return _version.compareTo(((TestStringDatasetVersion) o)._version);
     }
 
     @Override
     public Object getVersion() {
       return _version;
     }
-
   }
 
   @Test
   public void testCreationProps() {
-    for(Map.Entry<String, Map<String, Integer>> test: TEST_CONFIGS.entrySet()) 
{
+    for (Map.Entry<String, Map<String, Integer>> test : 
TEST_CONFIGS.entrySet()) {
       String testName = test.getKey();
       Properties testProps = new Properties();
-      for (Map.Entry<String, Integer> prop: test.getValue().entrySet()) {
+      for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
         testProps.setProperty(prop.getKey(), prop.getValue().toString());
       }
       NewestKSelectionPolicy policy = new NewestKSelectionPolicy(testProps);
-      Assert.assertEquals(policy.getVersionsSelected(),
-                          Math.abs(TEST_RESULTS.get(testName).intValue()),
-                          "Failure for test " + testName);
+      Assert.assertEquals(policy.getVersionsSelected(), 
Math.abs(TEST_RESULTS.get(testName).intValue()),
+          "Failure for test " + testName);
       Assert.assertEquals(policy.isExcludeMode(), 
TEST_RESULTS.get(testName).intValue() < 0,
-                          "Failure for test " + testName);
+          "Failure for test " + testName);
     }
 
-    for(Map.Entry<String, Map<String, Integer>> test: 
NEG_TEST_CONFIGS.entrySet()) {
+    for (Map.Entry<String, Map<String, Integer>> test : 
NEG_TEST_CONFIGS.entrySet()) {
       String testName = test.getKey();
       Properties testProps = new Properties();
-      for (Map.Entry<String, Integer> prop: test.getValue().entrySet()) {
+      for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
         testProps.setProperty(prop.getKey(), prop.getValue().toString());
       }
       try {
         new NewestKSelectionPolicy(testProps);
         Assert.fail("Exception expected for test " + testName);
-      }
-      catch (RuntimeException e) {
+      } catch (RuntimeException e) {
         //OK
       }
     }
   }
 
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testCreationProps_useIteratorException() {
+    for (Map.Entry<String, Map<String, Integer>> test : 
TEST_CONFIGS.entrySet()) {
+      String testName = test.getKey();
+      Properties testProps = new Properties();
+      for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
+        testProps.setProperty(prop.getKey(), prop.getValue().toString());
+      }
+      testProps.setProperty(NewestKSelectionPolicy.SHOULD_ITERATE_VERSIONS, 
"true");
+      NewestKSelectionPolicy policy = new NewestKSelectionPolicy(testProps);
+    }
+  }
+
   @Test
   public void testCreationConfig() {
-    for(Map.Entry<String, Map<String, Integer>> test: TEST_CONFIGS.entrySet()) 
{
+    for (Map.Entry<String, Map<String, Integer>> test : 
TEST_CONFIGS.entrySet()) {
       String testName = test.getKey();
       Config conf = ConfigFactory.parseMap(test.getValue());
       NewestKSelectionPolicy policy = new NewestKSelectionPolicy(conf);
-      Assert.assertEquals(policy.getVersionsSelected(),
-                          Math.abs(TEST_RESULTS.get(testName).intValue()),
-                          "Failure for test " + testName);
+      Assert.assertEquals(policy.getVersionsSelected(), 
Math.abs(TEST_RESULTS.get(testName).intValue()),
+          "Failure for test " + testName);
       Assert.assertEquals(policy.isExcludeMode(), 
TEST_RESULTS.get(testName).intValue() < 0,
-                          "Failure for test " + testName);
+          "Failure for test " + testName);
     }
 
-    for(Map.Entry<String, Map<String, Integer>> test: 
NEG_TEST_CONFIGS.entrySet()) {
+    for (Map.Entry<String, Map<String, Integer>> test : 
NEG_TEST_CONFIGS.entrySet()) {
       String testName = test.getKey();
       Config conf = ConfigFactory.parseMap(test.getValue());
       try {
         new NewestKSelectionPolicy(conf);
         Assert.fail("Exception expected for test " + testName);
-      }
-      catch (RuntimeException e) {
+      } catch (RuntimeException e) {
         // OK
       }
     }
@@ -170,47 +161,42 @@ public class NewestKSelectionPolicyTest {
 
     //selectedVersions 5 < 10
     Config conf = ConfigFactory.empty()
-          .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
-                     ConfigValueFactory.fromAnyRef(5));
+        .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 
ConfigValueFactory.fromAnyRef(5));
     NewestKSelectionPolicy policy = new NewestKSelectionPolicy(conf);
     Collection<DatasetVersion> res = policy.listSelectedVersions(versions);
     int idx = 0;
     Assert.assertEquals(res.size(), policy.getVersionsSelected());
-    for (DatasetVersion v: res) {
+    for (DatasetVersion v : res) {
       Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
     }
 
     //selectedVersions 15 > 10
     conf = ConfigFactory.empty()
-          .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
-                     ConfigValueFactory.fromAnyRef(15));
+        .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 
ConfigValueFactory.fromAnyRef(15));
     policy = new NewestKSelectionPolicy(conf);
     res = policy.listSelectedVersions(versions);
     idx = 0;
     Assert.assertEquals(res.size(), versions.size());
-    for (DatasetVersion v: res) {
+    for (DatasetVersion v : res) {
       Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
     }
 
     //notSelectedVersions 4 < 10
     conf = ConfigFactory.empty()
-          .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
-                     ConfigValueFactory.fromAnyRef(4));
+        .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
ConfigValueFactory.fromAnyRef(4));
     policy = new NewestKSelectionPolicy(conf);
     res = policy.listSelectedVersions(versions);
     idx = policy.getVersionsSelected();
     Assert.assertEquals(res.size(), versions.size() - 
policy.getVersionsSelected());
-    for (DatasetVersion v: res) {
+    for (DatasetVersion v : res) {
       Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
     }
 
     //notSelectedVersions 14 > 10
     conf = ConfigFactory.empty()
-          .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
-                     ConfigValueFactory.fromAnyRef(14));
+        .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 
ConfigValueFactory.fromAnyRef(14));
     policy = new NewestKSelectionPolicy(conf);
     res = policy.listSelectedVersions(versions);
     Assert.assertEquals(res.size(), 0);
   }
-
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
index 4d3fc3b04..58f8a3976 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -45,8 +46,8 @@ import org.apache.gobblin.data.management.trash.Trash;
 public class CleanableDatasetBaseTest {
 
   @Test
-  public void test() throws IOException {
-
+  public void test()
+      throws IOException {
     FileSystem fs = mock(FileSystem.class);
 
     Path datasetRoot = new Path("/test/dataset");
@@ -59,19 +60,21 @@ public class CleanableDatasetBaseTest {
 
     DatasetImpl dataset = new DatasetImpl(fs, false, false, false, false, 
datasetRoot);
 
-    when(dataset.versionFinder.findDatasetVersions(dataset)).
-        thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+    
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(false);
+
+    when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+        Lists.newArrayList(dataset1Version1, dataset1Version2));
 
     dataset.clean();
 
     Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
     Assert.assertTrue(dataset.getTrash().getDeleteOperations().get(0).getPath()
         .equals(dataset1Version2.getPathsToDelete().iterator().next()));
-
   }
 
   @Test
-  public void testSkipTrash() throws IOException {
+  public void testSkipTrash()
+      throws IOException {
 
     FileSystem fs = mock(FileSystem.class);
     Trash trash = mock(Trash.class);
@@ -86,8 +89,8 @@ public class CleanableDatasetBaseTest {
     when(fs.exists(any(Path.class))).thenReturn(true);
     DatasetImpl dataset = new DatasetImpl(fs, false, true, false, false, 
datasetRoot);
 
-    when(dataset.versionFinder.findDatasetVersions(dataset)).
-        thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+    when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+        Lists.newArrayList(dataset1Version1, dataset1Version2));
 
     dataset.clean();
 
@@ -96,15 +99,14 @@ public class CleanableDatasetBaseTest {
         .equals(dataset1Version2.getPathsToDelete().iterator().next()));
 
     Assert.assertTrue(dataset.getTrash().isSkipTrash());
-
   }
 
   @Test
-  public void testSimulate() throws IOException {
+  public void testSimulate()
+      throws IOException {
 
     FileSystem fs = mock(FileSystem.class);
     Trash trash = mock(Trash.class);
-
     Path datasetRoot = new Path("/test/dataset");
 
     DatasetVersion dataset1Version1 = new StringDatasetVersion("version1", new 
Path(datasetRoot, "version1"));
@@ -115,8 +117,8 @@ public class CleanableDatasetBaseTest {
     when(fs.exists(any(Path.class))).thenReturn(true);
     DatasetImpl dataset = new DatasetImpl(fs, true, false, false, false, 
datasetRoot);
 
-    when(dataset.versionFinder.findDatasetVersions(dataset)).
-        thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+    when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+        Lists.newArrayList(dataset1Version1, dataset1Version2));
 
     dataset.clean();
 
@@ -125,11 +127,11 @@ public class CleanableDatasetBaseTest {
         .equals(dataset1Version2.getPathsToDelete().iterator().next()));
 
     Assert.assertTrue(dataset.getTrash().isSimulate());
-
   }
 
   @Test
-  public void testDeleteEmptyDirectories() throws IOException {
+  public void testDeleteEmptyDirectories()
+      throws IOException {
     FileSystem fs = mock(FileSystem.class);
     Trash trash = mock(Trash.class);
 
@@ -142,12 +144,9 @@ public class CleanableDatasetBaseTest {
     when(trash.moveToTrash(any(Path.class))).thenReturn(true);
     when(fs.exists(any(Path.class))).thenReturn(true);
     DatasetImpl dataset = new DatasetImpl(fs, false, false, true, false, 
datasetRoot);
-
-    when(dataset.versionFinder.findDatasetVersions(dataset)).
-        thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
-
     when(fs.listStatus(any(Path.class))).thenReturn(new FileStatus[]{});
-
+    when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+        Lists.newArrayList(dataset1Version1, dataset1Version2));
     dataset.clean();
 
     Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
@@ -158,7 +157,40 @@ public class CleanableDatasetBaseTest {
     
verify(fs).delete(dataset1Version2.getPathsToDelete().iterator().next().getParent(),
 false);
     verify(fs, times(1)).delete(any(Path.class), eq(false));
     verify(fs, never()).delete(any(Path.class), eq(true));
+  }
+
+  @Test
+  public void testCleanIteratorFinder()
+      throws IOException {
+    FileSystem fs = mock(FileSystem.class);
+
+    Path datasetRoot = new Path("/test/dataset");
+
+    DatasetVersion dataset1Version1 = new StringDatasetVersion("version1", new 
Path(datasetRoot, "version1"));
+    DatasetVersion dataset1Version2 = new StringDatasetVersion("version2", new 
Path(datasetRoot, "version2"));
+
+    when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+    when(fs.exists(any(Path.class))).thenReturn(true);
+
+    DatasetImpl dataset = new DatasetImpl(fs, false, false, false, false, 
datasetRoot);
+
+    
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(false);
+    
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(true);
 
+    mockDatasetVersion(dataset, dataset1Version1, dataset1Version2);
+    dataset.clean();
+
+    Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
+    Assert.assertTrue(dataset.getTrash().getDeleteOperations().get(0).getPath()
+        .equals(dataset1Version2.getPathsToDelete().iterator().next()));
+  }
+
+  private void mockDatasetVersion(DatasetImpl dataset, DatasetVersion 
dataset1Version1, DatasetVersion dataset1Version2)
+      throws IOException {
+    RemoteIterator<DatasetVersion> versionRemoteIterator = 
mock(RemoteIterator.class);
+    
when(dataset.versionFinder.findDatasetVersion(dataset)).thenReturn(versionRemoteIterator);
+    when(versionRemoteIterator.hasNext()).thenReturn(true, true, false);
+    
when(versionRemoteIterator.next()).thenReturn(dataset1Version1).thenReturn(dataset1Version2);
   }
 
   private class DeleteFirstRetentionPolicy implements 
RetentionPolicy<StringDatasetVersion> {
@@ -179,8 +211,9 @@ public class CleanableDatasetBaseTest {
     public RetentionPolicy retentionPolicy = new DeleteFirstRetentionPolicy();
     public Path path;
 
-    public DatasetImpl(FileSystem fs, boolean simulate, boolean skipTrash,
-        boolean deleteEmptyDirectories, boolean deleteAsOwner, Path path) 
throws IOException {
+    public DatasetImpl(FileSystem fs, boolean simulate, boolean skipTrash, 
boolean deleteEmptyDirectories,
+        boolean deleteAsOwner, Path path)
+        throws IOException {
       super(fs, TestTrash.propertiesForTestTrash(), simulate, skipTrash, 
deleteEmptyDirectories, deleteAsOwner,
           LoggerFactory.getLogger(DatasetImpl.class));
       
when(versionFinder.versionClass()).thenReturn(StringDatasetVersion.class);
@@ -206,5 +239,4 @@ public class CleanableDatasetBaseTest {
       return (TestTrash) this.trash;
     }
   }
-
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
index 68c25f4f1..f5d99997c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
@@ -97,7 +97,9 @@ public class RetentionIntegrationTest {
         { "testCombinePolicy", "retention.job" },
         { "testCombinePolicy", "selection.conf" },
         { "testTimeBasedAccessControl", "selection.conf" },
-        { "testMultiVersionAccessControl", 
"daily-retention-with-accessControl.conf" }
+        { "testMultiVersionAccessControl", 
"daily-retention-with-accessControl.conf" },
+        { "testTimeBasedRetentionWithVersionIterator", "retention.conf"},
+        { "testTimeBasedRetentionWithVersionIterator", "selection.conf" }
     };
   }
 
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
new file mode 100644
index 000000000..48f39ed16
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import com.typesafe.config.ConfigFactory;
+
+import java.io.IOException;
+
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+@Test(groups = {"gobblin.data.management.version"})
+public class AbstractDatasetVersionFinderTest {
+
+  private FileSystem fs = Mockito.mock(FileSystem.class);
+
+  private AbstractDatasetVersionFinder _abstractDatasetVersionFinder =
+      new GlobModTimeDatasetVersionFinder(fs, ConfigFactory.empty());
+
+  @Test
+  void testGetDatasetVersionIterator()
+      throws IOException {
+    Path root = new Path("/root");
+    String globPattern = "/root/.*/.*";
+
+    FileStatus dir1 = new FileStatus(0, true, 1, 0, 123141, new 
Path("/root/dir1"));
+    FileStatus dir2 = new FileStatus(0, true, 1, 0, 124124, new 
Path("/root/dir1/dir2"));
+
+    RemoteIterator<FileStatus> rootIterator = 
Mockito.mock(RemoteIterator.class);
+    when(fs.listStatusIterator(root)).thenReturn(rootIterator);
+    when(rootIterator.hasNext()).thenReturn(true, false);
+    when(rootIterator.next()).thenReturn(dir1);
+
+    RemoteIterator<FileStatus> dir1Iterator = 
Mockito.mock(RemoteIterator.class);
+    when(fs.listStatusIterator(dir1.getPath())).thenReturn(dir1Iterator);
+    when(dir1Iterator.hasNext()).thenReturn(true, false);
+    when(dir1Iterator.next()).thenReturn(dir2);
+
+    RemoteIterator<FileStatus> dir2Iterator = 
Mockito.mock(RemoteIterator.class);
+    when(fs.listStatusIterator(dir2.getPath())).thenReturn(dir2Iterator);
+    when(dir2Iterator.hasNext()).thenReturn(false);
+
+    when(fs.getFileStatus(dir1.getPath())).thenReturn(dir1);
+    when(fs.getFileStatus(dir2.getPath())).thenReturn(dir2);
+
+    RemoteIterator<TimestampedDatasetVersion> resultIterator =
+        _abstractDatasetVersionFinder.getDatasetVersionIterator(root, 
globPattern);
+
+    // Verify result
+    Assert.assertTrue(resultIterator.hasNext());
+    Assert.assertEquals(dir2.getPath(), resultIterator.next().getPath());
+    Assert.assertFalse(resultIterator.hasNext());
+
+    // Verify interactions with mocks
+    verify(fs, times(1)).listStatusIterator(root);
+    verify(rootIterator, times(2)).hasNext();
+    verify(rootIterator, times(1)).next();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
new file mode 100644
index 000000000..1a8c998c2
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
@@ -0,0 +1,10 @@
+gobblin.retention : {
+       # ${testNameTempPath} is resolved at runtime by the test
+    dataset.pattern=${testNameTempPath}"/user/gobblin/*"
+    timebased.duration = P7D
+
+    
dataset.finder.class=org.apache.gobblin.data.management.retention.profile.ManagedCleanableDatasetFinder
+    
retention.policy.class=org.apache.gobblin.data.management.retention.policy.TimeBasedRetentionPolicy
+    
version.finder.class=org.apache.gobblin.data.management.retention.version.finder.GlobModTimeDatasetVersionFinder
+    version.finder.iterator=true
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
new file mode 100644
index 000000000..69ded90c4
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
@@ -0,0 +1,18 @@
+gobblin.retention : {
+
+    dataset : {
+      # ${testNameTempPath} is resolved at runtime by the test
+      pattern=${testNameTempPath}"/user/gobblin/*"
+      
finder.class=org.apache.gobblin.data.management.retention.profile.ManagedCleanableDatasetFinder
+    }
+
+    selection : {
+      
policy.class=org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy
+      timeBased.lookbackTime=7d
+    }
+
+    version : {
+      
finder.class=org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder
+      finder.iterator=true
+    }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
new file mode 100644
index 000000000..0f2e08cdb
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
@@ -0,0 +1,22 @@
+gobblin.test : {
+    currentTime : "02/19/2016 11:00:00"
+    create : [
+      {path:"/user/gobblin/Dataset1/Version1", modTime:"02/10/2016 10:00:00"},
+      {path:"/user/gobblin/Dataset1/Version2", modTime:"02/11/2016 10:00:00"},
+      {path:"/user/gobblin/Dataset1/Version3", modTime:"02/12/2016 10:00:00"},
+      {path:"/user/gobblin/Dataset1/Version4", modTime:"02/13/2016 10:00:00"},
+      {path:"/user/gobblin/Dataset1/Version5", modTime:"02/14/2016 10:00:00"}
+    ]
+
+    validate : {
+      retained : [
+        {path:"/user/gobblin/Dataset1/Version4", modTime:"02/13/2016 
10:00:00"},
+        {path:"/user/gobblin/Dataset1/Version5", modTime:"02/14/2016 10:00:00"}
+      ]
+      deleted : [
+        {path:"/user/gobblin/Dataset1/Version1", modTime:"02/10/2016 
10:00:00"},
+        {path:"/user/gobblin/Dataset1/Version2", modTime:"02/11/2016 
10:00:00"},
+        {path:"/user/gobblin/Dataset1/Version3", modTime:"02/12/2016 10:00:00"}
+      ]
+    }
+}
\ No newline at end of file

Reply via email to