arjun4084346 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1503272965
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java:
##########
@@ -86,6 +87,110 @@ public Collection<T> findDatasetVersions(Dataset dataset)
throws IOException {
return dataSetVersions;
}
+ /**
+ * Find dataset version in the input {@link org.apache.gobblin.dataset}.
Dataset versions are subdirectories of the
+ * input {@link org.apache.gobblin.dataset} representing a single manageable
unit in the dataset.
+ *
+ * @param dataset {@link org.apache.gobblin.dataset} to directory containing
all versions of a dataset
+ * @return - Returns an iterator for fetching each dataset version found.
+ * @throws IOException
+ */
+ @Override
+ public RemoteIterator<T> findDatasetVersion(Dataset dataset) throws
IOException {
+ FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+ Path versionGlobStatus = new Path(fsDataset.datasetRoot(),
globVersionPattern());
+ return getDatasetVersionIterator(fsDataset.datasetRoot(),
getRegexPattern(versionGlobStatus.toString()));
+ }
+
+ /**
+ * Returns an iterator to fetch the dataset versions for the datasets whose
path {@link org.apache.hadoop.fs.Path}
+ * starts with the root and matches the globPattern passed
+ *
+ * @param root - Path of the root from which the Dataset Versions have to be
returned
+ * @param pathPattern - Pattern to match the dataset version path
+ * @return - an iterator of matched data versions
+ * @throws IOException
+ */
+ public RemoteIterator<T> getDatasetVersionIterator(Path root, String
pathPattern) throws IOException {
+ Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+ RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+ iteratorStack.push(fsIterator);
+ return new RemoteIterator<T>() {
+ FileStatus nextFileStatus = null;
+ boolean isNextFileStatusProcessed = false;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (iteratorStack.isEmpty()) {
+ return false;
+ }
+ // No need to process if the next() has not been called
+ if (nextFileStatus != null && !isNextFileStatusProcessed) {
+ return true;
+ }
+ nextFileStatus = fetchNextFileStatus(iteratorStack, pathPattern);
+ isNextFileStatusProcessed = false;
+ return nextFileStatus != null;
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (nextFileStatus == null || isNextFileStatusProcessed) {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ T datasetVersion =
getDatasetVersion(PathUtils.relativizePath(nextFileStatus.getPath(), root),
nextFileStatus);
+ isNextFileStatusProcessed = true;
+ return datasetVersion;
+ }
+ };
+ }
+
+ /**
+ * Helper method to find the next filestatus matching the globPattern.
+ * This uses a stack to keep track of the fileStatusIterator returned at
each subpaths
+ *
+ * @param iteratorStack
+ * @param globPattern
+ * @return
+ * @throws IOException
+ */
+ private FileStatus fetchNextFileStatus(Stack<RemoteIterator<FileStatus>>
iteratorStack,
+ String globPattern) throws IOException {
+ while (!iteratorStack.isEmpty()) {
+ RemoteIterator<FileStatus> latestfsIterator = iteratorStack.pop();
+ while (latestfsIterator.hasNext()) {
+ FileStatus fileStatus = latestfsIterator.next();
+ if (fileStatus.isDirectory()) {
+ iteratorStack.push(fs.listStatusIterator(fileStatus.getPath()));
Review Comment:
You are doing it this way because `listStatusIterator` does not accept a
glob pattern?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]