arjun4084346 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1502913543


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java:
##########
@@ -86,6 +87,110 @@ public Collection<T> findDatasetVersions(Dataset dataset) 
throws IOException {
     return dataSetVersions;
   }
 
+  /**
+   * Find dataset version in the input {@link org.apache.gobblin.dataset}. 
Dataset versions are subdirectories of the
+   * input {@link org.apache.gobblin.dataset} representing a single manageable 
unit in the dataset.
+   *
+   * @param dataset {@link org.apache.gobblin.dataset} to directory containing 
all versions of a dataset
+   * @return - Returns an iterator for fetching each dataset version found.
+   * @throws IOException
+   */
+  @Override
+  public RemoteIterator<T> findDatasetVersion(Dataset dataset) throws 
IOException {
+    FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+    Path versionGlobStatus = new Path(fsDataset.datasetRoot(), 
globVersionPattern());
+    return getDatasetVersionIterator(fsDataset.datasetRoot(), 
getRegexPattern(versionGlobStatus.toString()));
+  }
+
+  /**
+   * Returns an iterator to fetch the dataset versions for the datasets whose 
path {@link org.apache.hadoop.fs.Path}
+   * starts with the root and matches the globPattern passed
+   *
+   * @param root - Path of the root from which the Dataset Versions have to be 
returned
+   * @param pathPattern - Pattern to match the dataset version path
+   * @return - an iterator of matched data versions
+   * @throws IOException
+   */
+  public RemoteIterator<T> getDatasetVersionIterator(Path root, String 
pathPattern) throws IOException {
+    Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+    RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+    iteratorStack.push(fsIterator);
+    return new RemoteIterator<T>() {
+      FileStatus nextFileStatus = null;
+      boolean isNextFileStatusProcessed = false;
+
+      @Override
+      public boolean hasNext() throws IOException {
+        if (iteratorStack.isEmpty()) {
+          return false;
+        }
+        // No need to process if the next() has not been called
+        if (nextFileStatus != null && !isNextFileStatusProcessed) {
+          return true;
+        }
+        nextFileStatus = fetchNextFileStatus(iteratorStack, pathPattern);
+        isNextFileStatusProcessed = false;

Review Comment:
   Is this iterator thread safe?
   Just check can you use `Iterators.transform()` from guava here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to