arpit09 commented on code in PR #3883:
URL: https://github.com/apache/gobblin/pull/3883#discussion_r1512067188
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java:
##########
@@ -86,6 +87,110 @@ public Collection<T> findDatasetVersions(Dataset dataset)
throws IOException {
return dataSetVersions;
}
+ /**
+ * Find dataset version in the input {@link org.apache.gobblin.dataset}.
Dataset versions are subdirectories of the
+ * input {@link org.apache.gobblin.dataset} representing a single manageable
unit in the dataset.
+ *
+ * @param dataset {@link org.apache.gobblin.dataset} to directory containing
all versions of a dataset
+ * @return - Returns an iterator for fetching each dataset version found.
+ * @throws IOException
+ */
+ @Override
+ public RemoteIterator<T> findDatasetVersion(Dataset dataset) throws
IOException {
+ FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+ Path versionGlobStatus = new Path(fsDataset.datasetRoot(),
globVersionPattern());
+ return getDatasetVersionIterator(fsDataset.datasetRoot(),
getRegexPattern(versionGlobStatus.toString()));
+ }
+
+ /**
+ * Returns an iterator to fetch the dataset versions for the datasets whose
path {@link org.apache.hadoop.fs.Path}
+ * starts with the root and matches the globPattern passed
+ *
+ * @param root - Path of the root from which the Dataset Versions have to be
returned
+ * @param pathPattern - Pattern to match the dataset version path
+ * @return - an iterator of matched data versions
+ * @throws IOException
+ */
+ public RemoteIterator<T> getDatasetVersionIterator(Path root, String
pathPattern) throws IOException {
+ Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+ RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+ iteratorStack.push(fsIterator);
+ return new RemoteIterator<T>() {
+ FileStatus nextFileStatus = null;
+ boolean isNextFileStatusProcessed = false;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (iteratorStack.isEmpty()) {
+ return false;
+ }
+ // No need to process if the next() has not been called
+ if (nextFileStatus != null && !isNextFileStatusProcessed) {
+ return true;
+ }
+ nextFileStatus = fetchNextFileStatus(iteratorStack, pathPattern);
+ isNextFileStatusProcessed = false;
Review Comment:
I have moved the nextfileStatus and isNextFileStatusProcessed as a pair, to
avoid partial updates of these 2 variables. Iterators are usually not
thread-safe, the caller should avoid sharing iterator among threads.
`Iterators.transform()` is mostly used when we have a collection and want to
iterate over it. In our case, if we don't want to bring in a collection of
dataset in-memory. We can still pass this iterator to the transform, is that
what you are referring here ? Any advantage we get due to it ?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java:
##########
@@ -17,21 +17,22 @@
package org.apache.gobblin.data.management.version.finder;
+import com.google.common.collect.Lists;
Review Comment:
Done, thanks for pointing out
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java:
##########
@@ -275,39 +278,48 @@ public void clean() throws IOException {
this.log.info(String.format("Cleaning dataset %s. Using version finder
%s and policy %s", this,
versionFinder.getClass().getName(), selectionPolicy));
-
- List<T> versions =
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
- if (versions.isEmpty()) {
- this.log.warn("No dataset version can be found. Ignoring.");
- continue;
- }
-
- Collections.sort(versions, Collections.reverseOrder());
-
- Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
-
- cleanImpl(deletableVersions);
-
- List<DatasetVersion> allVersions = Lists.newArrayList();
- for (T ver : versions) {
- allVersions.add(ver);
- }
- for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
- try {
- retentionAction.execute(allVersions);
- } catch (Throwable t) {
- atLeastOneFailureSeen = true;
- log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
- this.datasetRoot()), t);
+ // Avoiding OOM by iterating instead of loading all the datasetVersions
in memory
+ RemoteIterator<? extends T> versionRemoteIterator =
versionFinder.findDatasetVersion(this);
+ // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE to
avoid OOM
+ List<T> cleanableVersionsBatch = new ArrayList<>();
+ while (versionRemoteIterator.hasNext()) {
+ T version = versionRemoteIterator.next();
+ cleanableVersionsBatch.add(version);
+ if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+ boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
}
}
+ if (!cleanableVersionsBatch.isEmpty()) {
+ boolean isCleanSuccess = cleanDatasetVersions(cleanableVersionsBatch,
selectionPolicy, versionFinderAndPolicy);
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+ }
}
if (atLeastOneFailureSeen) {
- throw new RuntimeException(String.format(
- "At least one failure happened while processing %s. Look for
previous logs for failures", datasetRoot()));
+ throw new RuntimeException(
+ String.format("At least one failure happened while processing %s.
Look for previous logs for failures",
+ datasetRoot()));
+ }
+ }
+
+ private boolean cleanDatasetVersions(List<T> versions,
VersionSelectionPolicy<T> selectionPolicy,
+ VersionFinderAndPolicy<T> versionFinderAndPolicy) throws IOException {
+ Collections.sort(versions, Collections.reverseOrder());
+ Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
+ cleanImpl(deletableVersions);
+ List<DatasetVersion> allVersions = Lists.newArrayList(versions);
+ for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
+ try {
+ retentionAction.execute(allVersions);
+ } catch (Throwable t) {
+ log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
+ this.datasetRoot()), t);
+ return false;
Review Comment:
Yes, returning false here will only set the flag of atleastOneFailure at
line 295. The other dataset retention action will go through irrespective of
returning false here.
` atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;`
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java:
##########
@@ -86,6 +87,110 @@ public Collection<T> findDatasetVersions(Dataset dataset)
throws IOException {
return dataSetVersions;
}
+ /**
+ * Find dataset version in the input {@link org.apache.gobblin.dataset}.
Dataset versions are subdirectories of the
+ * input {@link org.apache.gobblin.dataset} representing a single manageable
unit in the dataset.
+ *
+ * @param dataset {@link org.apache.gobblin.dataset} to directory containing
all versions of a dataset
+ * @return - Returns an iterator for fetching each dataset version found.
+ * @throws IOException
+ */
+ @Override
+ public RemoteIterator<T> findDatasetVersion(Dataset dataset) throws
IOException {
+ FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+ Path versionGlobStatus = new Path(fsDataset.datasetRoot(),
globVersionPattern());
+ return getDatasetVersionIterator(fsDataset.datasetRoot(),
getRegexPattern(versionGlobStatus.toString()));
+ }
+
+ /**
+ * Returns an iterator to fetch the dataset versions for the datasets whose
path {@link org.apache.hadoop.fs.Path}
+ * starts with the root and matches the globPattern passed
+ *
+ * @param root - Path of the root from which the Dataset Versions have to be
returned
+ * @param pathPattern - Pattern to match the dataset version path
+ * @return - an iterator of matched data versions
+ * @throws IOException
+ */
+ public RemoteIterator<T> getDatasetVersionIterator(Path root, String
pathPattern) throws IOException {
+ Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+ RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+ iteratorStack.push(fsIterator);
+ return new RemoteIterator<T>() {
+ FileStatus nextFileStatus = null;
+ boolean isNextFileStatusProcessed = false;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (iteratorStack.isEmpty()) {
+ return false;
+ }
+ // No need to process if the next() has not been called
+ if (nextFileStatus != null && !isNextFileStatusProcessed) {
+ return true;
+ }
+ nextFileStatus = fetchNextFileStatus(iteratorStack, pathPattern);
+ isNextFileStatusProcessed = false;
+ return nextFileStatus != null;
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (nextFileStatus == null || isNextFileStatusProcessed) {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ T datasetVersion =
getDatasetVersion(PathUtils.relativizePath(nextFileStatus.getPath(), root),
nextFileStatus);
+ isNextFileStatusProcessed = true;
+ return datasetVersion;
+ }
+ };
+ }
+
+ /**
+ * Helper method to find the next filestatus matching the globPattern.
+ * This uses a stack to keep track of the fileStatusIterator returned at
each subpaths
+ *
+ * @param iteratorStack
+ * @param globPattern
+ * @return
+ * @throws IOException
+ */
+ private FileStatus fetchNextFileStatus(Stack<RemoteIterator<FileStatus>>
iteratorStack,
+ String globPattern) throws IOException {
+ while (!iteratorStack.isEmpty()) {
+ RemoteIterator<FileStatus> latestfsIterator = iteratorStack.pop();
+ while (latestfsIterator.hasNext()) {
+ FileStatus fileStatus = latestfsIterator.next();
+ if (fileStatus.isDirectory()) {
+ iteratorStack.push(fs.listStatusIterator(fileStatus.getPath()));
Review Comment:
Yes, hdfs file system doesn't provide a method that returns an iterator over
a glob pattern. It provides it over a file path, so from the root I keep going
to the next folder and keep checking all the filestatus if they match the
globPattern
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]