This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ef88a58cc [GOBBLIN-2006] Adding an iterator for fetching dataset
versions (#3883)
ef88a58cc is described below
commit ef88a58cc8663947ace68cd84694ace2359ccf12
Author: Arpit Varshney <[email protected]>
AuthorDate: Wed Mar 13 12:06:23 2024 +0530
[GOBBLIN-2006] Adding an iterator for fetching dataset versions (#3883)
* Add iterator to fetch data version
* Address review comments
* Move Iterator to config driven
---------
Co-authored-by: Arpit Varshney <[email protected]>
---
.../management/policy/NewestKSelectionPolicy.java | 18 ++-
.../dataset/MultiVersionCleanableDatasetBase.java | 150 +++++++++++++--------
.../finder/GlobModTimeDatasetVersionFinder.java | 17 ++-
.../finder/AbstractDatasetVersionFinder.java | 128 ++++++++++++++++--
.../finder/GlobModTimeDatasetVersionFinder.java | 22 ++-
.../management/version/finder/VersionFinder.java | 27 +++-
.../policy/NewestKSelectionPolicyTest.java | 136 +++++++++----------
.../retention/CleanableDatasetBaseTest.java | 78 +++++++----
.../integration/RetentionIntegrationTest.java | 4 +-
.../finder/AbstractDatasetVersionFinderTest.java | 83 ++++++++++++
.../retention.conf | 10 ++
.../selection.conf | 18 +++
.../setup_validate.conf | 22 +++
13 files changed, 539 insertions(+), 174 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
index 5d25a6b1b..cf79ecb18 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicy.java
@@ -33,10 +33,12 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
/**
* Select the newest k versions of the dataset.
+ * Newest k version policy can't be used while fetching version via Iterator
*/
@ToString
public class NewestKSelectionPolicy<T extends DatasetVersion> implements
VersionSelectionPolicy<T> {
@@ -61,6 +63,14 @@ public class NewestKSelectionPolicy<T extends
DatasetVersion> implements Version
*/
public static final String NEWEST_K_VERSIONS_NOTSELECTED_KEY =
"selection.newestK.versionsNotSelected";
+ /**
+ * This denotes to use iterator for fetching all the versions of dataset.
+ * This should be set true when the dataset versions has to be pulled in
memory iteratively
+ * If not set, may result into OOM as all the dataset versions are pulled
in-memory
+ * If true, this can't be used along with NewestKSelectionPolicy as
NewestKSelectionPolicy requires all the datasets info
+ */
+ public static final String SHOULD_ITERATE_VERSIONS =
"version.should.iterate";
+
public static final Integer VERSIONS_SELECTED_DEFAULT = 2;
public static final Integer MAX_VERSIONS_ALLOWED = 1000000;
@@ -79,10 +89,14 @@ public class NewestKSelectionPolicy<T extends
DatasetVersion> implements Version
}
static Params createFromConfig(Config config) {
+ if (ConfigUtils.getBoolean(config, SHOULD_ITERATE_VERSIONS, false)) {
+ throw new RuntimeException("NewestKSelection policy can't be used with
an iterator for finding versions");
+ }
if (config.hasPath(NEWEST_K_VERSIONS_SELECTED_KEY)) {
if (config.hasPath(NEWEST_K_VERSIONS_NOTSELECTED_KEY)) {
- throw new RuntimeException("Only one of " +
NEWEST_K_VERSIONS_SELECTED_KEY + " and "
- + NEWEST_K_VERSIONS_NOTSELECTED_KEY + " can be specified.");
+ throw new RuntimeException(
+ "Only one of " + NEWEST_K_VERSIONS_SELECTED_KEY + " and " +
NEWEST_K_VERSIONS_NOTSELECTED_KEY
+ + " can be specified.");
}
return new Params(config.getInt(NEWEST_K_VERSIONS_SELECTED_KEY),
false);
} else if (config.hasPath(NEWEST_K_VERSIONS_NOTSELECTED_KEY)) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
index 862082230..5c6f9710f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/MultiVersionCleanableDatasetBase.java
@@ -18,17 +18,14 @@
package org.apache.gobblin.data.management.retention.dataset;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Singular;
-
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +34,11 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+
import
org.apache.gobblin.data.management.policy.EmbeddedRetentionSelectionPolicy;
import org.apache.gobblin.data.management.policy.SelectNothingPolicy;
import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
@@ -105,8 +107,7 @@ import org.apache.gobblin.util.ConfigUtils;
*
* @param <T> type of {@link FileSystemDatasetVersion} supported by this
{@link CleanableDataset}.
*/
-public abstract class MultiVersionCleanableDatasetBase<T extends
FileSystemDatasetVersion>
- implements CleanableDataset, FileSystemDataset {
+public abstract class MultiVersionCleanableDatasetBase<T extends
FileSystemDatasetVersion> implements CleanableDataset, FileSystemDataset {
/**
* @deprecated in favor of {@link FsCleanableHelper}
@@ -114,6 +115,8 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
@Deprecated
public static final String CONFIGURATION_KEY_PREFIX =
FsCleanableHelper.CONFIGURATION_KEY_PREFIX;
+ public static final Integer CLEANABLE_DATASET_BATCH_SIZE = 100;
+
/**
* @deprecated in favor of {@link FsCleanableHelper}
*/
@@ -181,6 +184,7 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
*/
@Deprecated
protected final boolean deleteAsOwner;
+
/**
* Get {@link
org.apache.gobblin.data.management.retention.policy.RetentionPolicy} to use.
*/
@@ -192,15 +196,17 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
Boolean.valueOf(props.getProperty(SKIP_TRASH_KEY, SKIP_TRASH_DEFAULT)),
Boolean.valueOf(props.getProperty(DELETE_EMPTY_DIRECTORIES_KEY,
DELETE_EMPTY_DIRECTORIES_DEFAULT)),
Boolean.valueOf(props.getProperty(DELETE_AS_OWNER_KEY,
DELETE_AS_OWNER_DEFAULT)),
- ConfigUtils.getBoolean(config, IS_DATASET_BLACKLISTED_KEY,
Boolean.valueOf(IS_DATASET_BLACKLISTED_DEFAULT)), log);
+ ConfigUtils.getBoolean(config, IS_DATASET_BLACKLISTED_KEY,
Boolean.valueOf(IS_DATASET_BLACKLISTED_DEFAULT)),
+ log);
}
- public MultiVersionCleanableDatasetBase(final FileSystem fs, final
Properties props, Logger log) throws IOException {
+ public MultiVersionCleanableDatasetBase(final FileSystem fs, final
Properties props, Logger log)
+ throws IOException {
// This constructor is used by retention jobs configured through job
configs and do not use dataset configs from config store.
// IS_DATASET_BLACKLISTED_KEY is only available with dataset config. Hence
set IS_DATASET_BLACKLISTED_KEY to default
// ...false for jobs running with job configs
- this(fs, props, ConfigFactory.parseMap(ImmutableMap.<String, String>
of(IS_DATASET_BLACKLISTED_KEY,
- IS_DATASET_BLACKLISTED_DEFAULT)), log);
+ this(fs, props, ConfigFactory.parseMap(
+ ImmutableMap.<String, String>of(IS_DATASET_BLACKLISTED_KEY,
IS_DATASET_BLACKLISTED_DEFAULT)), log);
}
/**
@@ -220,7 +226,8 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
boolean deleteEmptyDirectories, boolean deleteAsOwner, boolean
isDatasetBlacklisted, Logger log)
throws IOException {
this.log = log;
- this.fsCleanableHelper = new FsCleanableHelper(fs, properties, simulate,
skipTrash, deleteEmptyDirectories, deleteAsOwner, log);
+ this.fsCleanableHelper =
+ new FsCleanableHelper(fs, properties, simulate, skipTrash,
deleteEmptyDirectories, deleteAsOwner, log);
this.fs = fs;
this.simulate = simulate;
this.skipTrash = skipTrash;
@@ -228,11 +235,11 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
this.trash = this.fsCleanableHelper.getTrash();
this.deleteAsOwner = deleteAsOwner;
this.isDatasetBlacklisted = isDatasetBlacklisted;
-
}
public MultiVersionCleanableDatasetBase(FileSystem fs, Properties
properties, boolean simulate, boolean skipTrash,
- boolean deleteEmptyDirectories, boolean deleteAsOwner, Logger log)
throws IOException {
+ boolean deleteEmptyDirectories, boolean deleteAsOwner, Logger log)
+ throws IOException {
this(fs, properties, simulate, skipTrash, deleteEmptyDirectories,
deleteAsOwner,
Boolean.parseBoolean(IS_DATASET_BLACKLISTED_DEFAULT), log);
}
@@ -241,21 +248,22 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
* Method to perform the Retention operations for this dataset.
*
*<ul>
- * <li>{@link
MultiVersionCleanableDatasetBase#getVersionFindersAndPolicies()} gets a list
{@link VersionFinderAndPolicy}s
- * <li>Each {@link VersionFinderAndPolicy} contains a {@link VersionFinder}
and a {@link VersionSelectionPolicy}. It can
- * optionally have a {@link RetentionAction}
- * <li>The {@link MultiVersionCleanableDatasetBase#clean()} method finds all
the {@link FileSystemDatasetVersion}s using
- * {@link VersionFinderAndPolicy#versionFinder}
- * <li> It gets the deletable {@link FileSystemDatasetVersion}s by applying
{@link VersionFinderAndPolicy#versionSelectionPolicy}.
- * These deletable version are deleted and then deletes empty parent
directories.
- * <li>If additional retention actions are available at {@link
VersionFinderAndPolicy#getRetentionActions()}, all versions
- * found by the {@link VersionFinderAndPolicy#versionFinder} are passed to
{@link RetentionAction#execute(List)} for
- * each {@link RetentionAction}
+ * <li>{@link
MultiVersionCleanableDatasetBase#getVersionFindersAndPolicies()} gets a list
{@link VersionFinderAndPolicy}s
+ * <li>Each {@link VersionFinderAndPolicy} contains a {@link VersionFinder}
and a {@link VersionSelectionPolicy}. It can
+ * optionally have a {@link RetentionAction}
+ * <li>The {@link MultiVersionCleanableDatasetBase#clean()} method finds all
the {@link FileSystemDatasetVersion}s using
+ * {@link VersionFinderAndPolicy#versionFinder}
+ * <li> It gets the deletable {@link FileSystemDatasetVersion}s by applying
{@link VersionFinderAndPolicy#versionSelectionPolicy}.
+ * These deletable version are deleted and then deletes empty parent
directories.
+ * <li>If additional retention actions are available at {@link
VersionFinderAndPolicy#getRetentionActions()}, all versions
+ * found by the {@link VersionFinderAndPolicy#versionFinder} are passed to
{@link RetentionAction#execute(List)} for
+ * each {@link RetentionAction}
* </ul>
*
*/
@Override
- public void clean() throws IOException {
+ public void clean()
+ throws IOException {
if (this.isDatasetBlacklisted) {
this.log.info("Dataset blacklisted. Cleanup skipped for " +
datasetRoot());
@@ -265,7 +273,6 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
boolean atLeastOneFailureSeen = false;
for (VersionFinderAndPolicy<T> versionFinderAndPolicy :
getVersionFindersAndPolicies()) {
-
VersionSelectionPolicy<T> selectionPolicy =
versionFinderAndPolicy.getVersionSelectionPolicy();
VersionFinder<? extends T> versionFinder =
versionFinderAndPolicy.getVersionFinder();
@@ -275,42 +282,68 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
this.log.info(String.format("Cleaning dataset %s. Using version finder
%s and policy %s", this,
versionFinder.getClass().getName(), selectionPolicy));
+ if (versionFinder.useIteratorForFindingVersions()) {
+ // Avoiding OOM by iterating instead of loading all the
datasetVersions in memory
+ RemoteIterator<? extends T> versionRemoteIterator =
versionFinder.findDatasetVersion(this);
+ // Cleaning Dataset versions in batch of CLEANABLE_DATASET_BATCH_SIZE
to avoid OOM
+ List<T> cleanableVersionsBatch = new ArrayList<>();
+ while (versionRemoteIterator.hasNext()) {
+ T version = versionRemoteIterator.next();
+ cleanableVersionsBatch.add(version);
+ if (cleanableVersionsBatch.size() >= CLEANABLE_DATASET_BATCH_SIZE) {
+ boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
+ versionFinderAndPolicy.getRetentionActions());
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+ }
+ }
+ if (!cleanableVersionsBatch.isEmpty()) {
+ boolean isCleanSuccess =
cleanDatasetVersions(cleanableVersionsBatch, selectionPolicy,
+ versionFinderAndPolicy.getRetentionActions());
+ atLeastOneFailureSeen = atLeastOneFailureSeen || !isCleanSuccess;
+ }
+ } else {
+ List<T> versions =
Lists.newArrayList(versionFinder.findDatasetVersions(this));
- List<T> versions =
Lists.newArrayList(versionFinder.findDatasetVersions(this));
-
- if (versions.isEmpty()) {
- this.log.warn("No dataset version can be found. Ignoring.");
- continue;
- }
-
- Collections.sort(versions, Collections.reverseOrder());
-
- Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
-
- cleanImpl(deletableVersions);
-
- List<DatasetVersion> allVersions = Lists.newArrayList();
- for (T ver : versions) {
- allVersions.add(ver);
- }
- for (RetentionAction retentionAction :
versionFinderAndPolicy.getRetentionActions()) {
- try {
- retentionAction.execute(allVersions);
- } catch (Throwable t) {
- atLeastOneFailureSeen = true;
- log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
- this.datasetRoot()), t);
+ if (versions.isEmpty()) {
+ this.log.warn("No dataset version can be found. Ignoring.");
+ continue;
}
+ boolean isCleanSuccess =
+ cleanDatasetVersions(versions, selectionPolicy,
versionFinderAndPolicy.getRetentionActions());
+ atLeastOneFailureSeen = !isCleanSuccess || atLeastOneFailureSeen;
}
}
if (atLeastOneFailureSeen) {
- throw new RuntimeException(String.format(
- "At least one failure happened while processing %s. Look for
previous logs for failures", datasetRoot()));
+ throw new RuntimeException(
+ String.format("At least one failure happened while processing %s.
Look for previous logs for failures",
+ datasetRoot()));
+ }
+ }
+
+ private boolean cleanDatasetVersions(List<T> versions,
VersionSelectionPolicy<T> selectionPolicy,
+ List<RetentionAction> retentionActions)
+ throws IOException {
+ boolean isCleanSuccess = true;
+ Collections.sort(versions, Collections.reverseOrder());
+ Collection<T> deletableVersions =
selectionPolicy.listSelectedVersions(versions);
+ cleanImpl(deletableVersions);
+ List<DatasetVersion> allVersions = Lists.newArrayList(versions);
+ for (RetentionAction retentionAction : retentionActions) {
+ try {
+ retentionAction.execute(allVersions);
+ } catch (Throwable t) {
+ log.error(String.format("RetentionAction %s failed for dataset %s",
retentionAction.getClass().getName(),
+ this.datasetRoot()), t);
+ isCleanSuccess = false;
+ }
}
+ versions.clear();
+ return isCleanSuccess;
}
- protected void cleanImpl(Collection<T> deletableVersions) throws IOException
{
+ protected void cleanImpl(Collection<T> deletableVersions)
+ throws IOException {
this.fsCleanableHelper.clean(deletableVersions, this);
}
@@ -343,13 +376,16 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
* @deprecated use {@link VersionFinderAndPolicyBuilder}
*/
@Deprecated
- public VersionFinderAndPolicy(VersionSelectionPolicy<T>
versionSelectionPolicy, VersionFinder<? extends T> versionFinder, Config
config) {
+ public VersionFinderAndPolicy(VersionSelectionPolicy<T>
versionSelectionPolicy,
+ VersionFinder<? extends T> versionFinder, Config config) {
this.versionSelectionPolicy = versionSelectionPolicy;
this.versionFinder = versionFinder;
this.retentionActions = Lists.newArrayList();
this.config = config;
}
- public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy,
VersionFinder<? extends T> versionFinder, Config config) {
+
+ public VersionFinderAndPolicy(RetentionPolicy<T> retentionPolicy,
VersionFinder<? extends T> versionFinder,
+ Config config) {
this(new EmbeddedRetentionSelectionPolicy<>(retentionPolicy),
versionFinder, config);
}
@@ -371,8 +407,8 @@ public abstract class MultiVersionCleanableDatasetBase<T
extends FileSystemDatas
} else {
localRetentionActions = Lists.newArrayList(this.retentionActions);
}
- return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy,
this.versionFinder,
- localRetentionActions, this.config);
+ return new VersionFinderAndPolicy<T>(localVersionSelectionPolicy,
this.versionFinder, localRetentionActions,
+ this.config);
}
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
index 1b8182429..cdcdf481c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/version/finder/GlobModTimeDatasetVersionFinder.java
@@ -23,6 +23,7 @@ import com.typesafe.config.Config;
import org.apache.gobblin.data.management.retention.version.DatasetVersion;
import
org.apache.gobblin.data.management.retention.version.TimestampedDatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -35,14 +36,24 @@ public class GlobModTimeDatasetVersionFinder extends
DatasetVersionFinder<Timest
private final
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder
realVersionFinder;
private static final String VERSION_FINDER_GLOB_PATTERN_KEY =
"gobblin.retention.version.finder.pattern";
+ /**
+ * This denotes to use iterator for fetching all the versions of dataset.
+ * This should be set true when the dataset versions has to be pulled in
memory iteratively
+ * If not set, may result into OOM as all the dataset versions are pulled
in-memory
+ */
+ private static final String SHOULD_ITERATE_VERSIONS =
"version.should.iterate";
+
public GlobModTimeDatasetVersionFinder(FileSystem fs, Config config) {
- this(fs, config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY)) : new Path("*"));
+ this(fs,
+ config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY))
+ : new Path("*"), ConfigUtils.getBoolean(config,
SHOULD_ITERATE_VERSIONS, false));
}
- public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern) {
+ public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern,
boolean useIterator) {
super(fs);
this.realVersionFinder =
- new
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder(fs,
globPattern);
+ new
org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder(fs,
globPattern,
+ useIterator);
}
@Override
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
index a3b36411f..8991b2dda 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinder.java
@@ -17,22 +17,29 @@
package org.apache.gobblin.data.management.version.finder;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Properties;
+import java.util.Stack;
+import java.util.regex.Pattern;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-
+import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
import org.apache.gobblin.util.PathUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
/**
* Class to find {@link FileSystemDataset} versions in the file system.
@@ -68,7 +75,8 @@ public abstract class AbstractDatasetVersionFinder<T extends
FileSystemDatasetVe
* @throws IOException
*/
@Override
- public Collection<T> findDatasetVersions(Dataset dataset) throws IOException
{
+ public Collection<T> findDatasetVersions(Dataset dataset)
+ throws IOException {
FileSystemDataset fsDataset = (FileSystemDataset) dataset;
Path versionGlobStatus = new Path(fsDataset.datasetRoot(),
globVersionPattern());
FileStatus[] dataSetVersionPaths = this.fs.globStatus(versionGlobStatus);
@@ -86,6 +94,109 @@ public abstract class AbstractDatasetVersionFinder<T
extends FileSystemDatasetVe
return dataSetVersions;
}
+ /**
+ * Find dataset version in the input {@link org.apache.gobblin.dataset}.
Dataset versions are subdirectories of the
+ * input {@link org.apache.gobblin.dataset} representing a single manageable
unit in the dataset.
+ *
+ * @param dataset {@link org.apache.gobblin.dataset} to directory containing
all versions of a dataset
+ * @return - Returns an iterator for fetching each dataset version found.
+ * @throws IOException
+ */
+ @Override
+ public RemoteIterator<T> findDatasetVersion(Dataset dataset)
+ throws IOException {
+ FileSystemDataset fsDataset = (FileSystemDataset) dataset;
+ Path versionGlobStatus = new Path(fsDataset.datasetRoot(),
globVersionPattern());
+
+ return getDatasetVersionIterator(fsDataset.datasetRoot(),
getRegexPattern(versionGlobStatus.toString()));
+ }
+
+ /**
+ * Returns an iterator to fetch the dataset versions for the datasets whose
path {@link org.apache.hadoop.fs.Path}
+ * starts with the root and matches the globPattern passed
+ *
+ * @param root - Path of the root from which the Dataset Versions have to be
returned
+ * @param pathPattern - Pattern to match the dataset version path
+ * @return - an iterator of matched data versions
+ * @throws IOException
+ */
+ public RemoteIterator<T> getDatasetVersionIterator(Path root, String
pathPattern)
+ throws IOException {
+ Stack<RemoteIterator<FileStatus>> iteratorStack = new Stack<>();
+ RemoteIterator<FileStatus> fsIterator = fs.listStatusIterator(root);
+ iteratorStack.push(fsIterator);
+ return new RemoteIterator<T>() {
+ Pair<FileStatus, Boolean> nextFileStatus = new MutablePair<>();
+
+ @Override
+ public boolean hasNext()
+ throws IOException {
+ if (iteratorStack.isEmpty()) {
+ return false;
+ }
+ // No need to process if the next() has not been called
+ if (nextFileStatus.getKey() != null && !nextFileStatus.getValue()) {
+ return true;
+ }
+ nextFileStatus = new MutablePair<>(fetchNextFileStatus(iteratorStack,
pathPattern), false);
+ return nextFileStatus.getKey() != null;
+ }
+
+ @Override
+ public T next()
+ throws IOException {
+ if (nextFileStatus.getKey() == null || nextFileStatus.getValue()) {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+ T datasetVersion =
getDatasetVersion(PathUtils.relativizePath(nextFileStatus.getKey().getPath(),
root),
+ nextFileStatus.getKey());
+ nextFileStatus.setValue(true);
+ return datasetVersion;
+ }
+ };
+ }
+
+ /**
+ * Helper method to find the next filestatus matching the globPattern.
+ * This uses a stack to keep track of the fileStatusIterator returned at
each subpaths
+ *
+ * @param iteratorStack
+ * @param globPattern
+ * @return
+ * @throws IOException
+ */
+ private FileStatus fetchNextFileStatus(Stack<RemoteIterator<FileStatus>>
iteratorStack, String globPattern)
+ throws IOException {
+ while (!iteratorStack.isEmpty()) {
+ RemoteIterator<FileStatus> latestfsIterator = iteratorStack.pop();
+ while (latestfsIterator.hasNext()) {
+ FileStatus fileStatus = latestfsIterator.next();
+ if (fileStatus.isDirectory()) {
+ iteratorStack.push(fs.listStatusIterator(fileStatus.getPath()));
+ }
+ if (Pattern.matches(globPattern,
fileStatus.getPath().toUri().getPath())) {
+ if (latestfsIterator.hasNext()) {
+ // Pushing back the current file status iterator before returning
as there are more files to be processed
+ iteratorStack.push(latestfsIterator);
+ }
+ return fileStatus;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Converting a globPatter to a regex pattern to match the file status path
+ *
+ * @return
+ */
+ private static String getRegexPattern(String globPattern) {
+ return GlobPattern.compile(globPattern).pattern().replaceAll("\\.\\*",
"[^/]*");
+ }
+
/**
* Should return class of T.
*/
@@ -107,5 +218,4 @@ public abstract class AbstractDatasetVersionFinder<T
extends FileSystemDatasetVe
* @return {@link org.apache.gobblin.data.management.version.DatasetVersion}
for that {@link FileStatus}.
*/
public abstract T getDatasetVersion(Path pathRelativeToDatasetRoot,
FileStatus versionFileStatus);
-
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
index c71eb2d5c..ba863ee94 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/GlobModTimeDatasetVersionFinder.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.data.management.version.FileSystemDatasetVersion;
import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -37,21 +38,31 @@ import
org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
public class GlobModTimeDatasetVersionFinder extends
DatasetVersionFinder<TimestampedDatasetVersion> {
private final Path globPattern;
+ private boolean useIterator;
private static final String VERSION_FINDER_GLOB_PATTERN_KEY =
"version.globPattern";
+ /**
+ * This denotes to use iterator for fetching all the versions of dataset.
+ * This should be set true when the dataset versions has to be pulled in
memory iteratively
+ * If not set, may result into OOM as all the dataset versions are pulled
in-memory
+ */
+ private static final String SHOULD_ITERATE_VERSIONS =
"version.should.iterate";
+
public GlobModTimeDatasetVersionFinder(FileSystem fs, Config config) {
- this(fs, config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY)
- ? new Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY)) : new
Path("*"));
+ this(fs,
+ config.hasPath(VERSION_FINDER_GLOB_PATTERN_KEY) ? new
Path(config.getString(VERSION_FINDER_GLOB_PATTERN_KEY))
+ : new Path("*"), ConfigUtils.getBoolean(config,
SHOULD_ITERATE_VERSIONS, false));
}
public GlobModTimeDatasetVersionFinder(FileSystem fs, Properties props) {
this(fs, ConfigFactory.parseProperties(props));
}
- public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern) {
+ public GlobModTimeDatasetVersionFinder(FileSystem fs, Path globPattern,
boolean useIterator) {
super(fs);
this.globPattern = globPattern;
+ this.useIterator = useIterator;
}
@Override
@@ -64,6 +75,11 @@ public class GlobModTimeDatasetVersionFinder extends
DatasetVersionFinder<Timest
return this.globPattern;
}
+ @Override
+ public boolean useIteratorForFindingVersions(){
+ return this.useIterator;
+ }
+
@Override
public TimestampedDatasetVersion getDatasetVersion(Path
pathRelativeToDatasetRoot, Path fullPath) {
try {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
index 7b8b14f12..619fba509 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/VersionFinder.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.hadoop.fs.RemoteIterator;
+
/**
* Finds dataset versions.
@@ -43,5 +45,28 @@ public interface VersionFinder<T extends DatasetVersion> {
* @return Collection of {@link DatasetVersion} for each dataset version
found.
* @throws IOException
*/
- public Collection<T> findDatasetVersions(Dataset dataset) throws IOException;
+ public Collection<T> findDatasetVersions(Dataset dataset)
+ throws IOException;
+
+ /**
+ * Find dataset versions for {@link Dataset}. Each dataset versions
represents a single manageable unit in the dataset.
+ *
+ * @param dataset
+ * @return - an iterator
+ * @throws IOException
+ */
+ public default RemoteIterator<T> findDatasetVersion(Dataset dataset)
+ throws IOException {
+ return (RemoteIterator<T>)
findDatasetVersions(dataset).stream().iterator();
+ }
+
+ /**
+ * Returns a boolean to identify whether to use iteratorVersion or
Collections
+ * for finding dataset versions
+ *
+ * @return
+ */
+ public default boolean useIteratorForFindingVersions() {
+ return false;
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
index d756ddc61..7f2670714 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/NewestKSelectionPolicyTest.java
@@ -31,57 +31,40 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.data.management.version.DatasetVersion;
+
/** Unit tests for {@link NewestKSelectionPolicy} */
public class NewestKSelectionPolicyTest {
private static final Map<String, Map<String, Integer>> TEST_CONFIGS =
- ImmutableMap.<String, Map<String, Integer>>builder()
- .put("empty", ImmutableMap.<String, Integer>builder().build())
- .put("selectedPos", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
- .build())
- .put("notSelectedPos", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 10)
- .build())
- .build();
+ ImmutableMap.<String, Map<String, Integer>>builder().put("empty",
ImmutableMap.<String, Integer>builder().build())
+ .put("selectedPos",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
+ .build()).put("notSelectedPos",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
10)
+ .build()).build();
private static final Map<String, Map<String, Integer>> NEG_TEST_CONFIGS =
- ImmutableMap.<String, Map<String, Integer>>builder()
- .put("bothProps", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 5)
- .build())
- .put("selectedNeg", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, -5)
- .build())
- .put("notSelectedNeg", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, -1)
- .build())
- .put("selectedBig", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
- NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
- .build())
- .put("notSelectedBig", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
- NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
- .build())
- .put("selected0", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 0)
- .build())
- .put("notSelected0", ImmutableMap.<String, Integer>builder()
-
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY, 0)
- .build())
- .build();
+ ImmutableMap.<String, Map<String, Integer>>builder().put("bothProps",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, 5)
+
.put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
5).build()).put("selectedNeg",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY, -5)
+ .build()).put("notSelectedNeg",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
-1)
+ .build()).put("selectedBig", ImmutableMap.<String,
Integer>builder()
+ .put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED + 1)
+ .build()).put("notSelectedBig", ImmutableMap.<String,
Integer>builder()
+ .put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
+ NewestKSelectionPolicy.MAX_VERSIONS_ALLOWED +
1).build()).put("selected0",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
0).build())
+ .put("notSelected0",
+ ImmutableMap.<String,
Integer>builder().put(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
0)
+ .build()).build();
private static final Map<String, Integer> TEST_RESULTS =
- ImmutableMap.<String, Integer>builder()
- .put("empty", NewestKSelectionPolicy.VERSIONS_SELECTED_DEFAULT)
- .put("selectedPos", 5)
- .put("notSelectedPos", -10)
- .build();
-
- public static class TestStringDatasetVersion implements DatasetVersion,
-
Comparable<DatasetVersion> {
+ ImmutableMap.<String, Integer>builder().put("empty",
NewestKSelectionPolicy.VERSIONS_SELECTED_DEFAULT)
+ .put("selectedPos", 5).put("notSelectedPos", -10).build();
+
+ public static class TestStringDatasetVersion implements DatasetVersion,
Comparable<DatasetVersion> {
private String _version;
public TestStringDatasetVersion(String version) {
@@ -93,69 +76,77 @@ public class NewestKSelectionPolicyTest {
if (!(o instanceof TestStringDatasetVersion)) {
throw new RuntimeException("Incompatible version: " + o);
}
- return _version.compareTo(((TestStringDatasetVersion)o)._version);
+ return _version.compareTo(((TestStringDatasetVersion) o)._version);
}
@Override
public Object getVersion() {
return _version;
}
-
}
@Test
public void testCreationProps() {
- for(Map.Entry<String, Map<String, Integer>> test: TEST_CONFIGS.entrySet())
{
+ for (Map.Entry<String, Map<String, Integer>> test :
TEST_CONFIGS.entrySet()) {
String testName = test.getKey();
Properties testProps = new Properties();
- for (Map.Entry<String, Integer> prop: test.getValue().entrySet()) {
+ for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
testProps.setProperty(prop.getKey(), prop.getValue().toString());
}
NewestKSelectionPolicy policy = new NewestKSelectionPolicy(testProps);
- Assert.assertEquals(policy.getVersionsSelected(),
- Math.abs(TEST_RESULTS.get(testName).intValue()),
- "Failure for test " + testName);
+ Assert.assertEquals(policy.getVersionsSelected(),
Math.abs(TEST_RESULTS.get(testName).intValue()),
+ "Failure for test " + testName);
Assert.assertEquals(policy.isExcludeMode(),
TEST_RESULTS.get(testName).intValue() < 0,
- "Failure for test " + testName);
+ "Failure for test " + testName);
}
- for(Map.Entry<String, Map<String, Integer>> test:
NEG_TEST_CONFIGS.entrySet()) {
+ for (Map.Entry<String, Map<String, Integer>> test :
NEG_TEST_CONFIGS.entrySet()) {
String testName = test.getKey();
Properties testProps = new Properties();
- for (Map.Entry<String, Integer> prop: test.getValue().entrySet()) {
+ for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
testProps.setProperty(prop.getKey(), prop.getValue().toString());
}
try {
new NewestKSelectionPolicy(testProps);
Assert.fail("Exception expected for test " + testName);
- }
- catch (RuntimeException e) {
+ } catch (RuntimeException e) {
//OK
}
}
}
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testCreationProps_useIteratorException() {
+ for (Map.Entry<String, Map<String, Integer>> test :
TEST_CONFIGS.entrySet()) {
+ String testName = test.getKey();
+ Properties testProps = new Properties();
+ for (Map.Entry<String, Integer> prop : test.getValue().entrySet()) {
+ testProps.setProperty(prop.getKey(), prop.getValue().toString());
+ }
+ testProps.setProperty(NewestKSelectionPolicy.SHOULD_ITERATE_VERSIONS,
"true");
+ NewestKSelectionPolicy policy = new NewestKSelectionPolicy(testProps);
+ }
+ }
+
@Test
public void testCreationConfig() {
- for(Map.Entry<String, Map<String, Integer>> test: TEST_CONFIGS.entrySet())
{
+ for (Map.Entry<String, Map<String, Integer>> test :
TEST_CONFIGS.entrySet()) {
String testName = test.getKey();
Config conf = ConfigFactory.parseMap(test.getValue());
NewestKSelectionPolicy policy = new NewestKSelectionPolicy(conf);
- Assert.assertEquals(policy.getVersionsSelected(),
- Math.abs(TEST_RESULTS.get(testName).intValue()),
- "Failure for test " + testName);
+ Assert.assertEquals(policy.getVersionsSelected(),
Math.abs(TEST_RESULTS.get(testName).intValue()),
+ "Failure for test " + testName);
Assert.assertEquals(policy.isExcludeMode(),
TEST_RESULTS.get(testName).intValue() < 0,
- "Failure for test " + testName);
+ "Failure for test " + testName);
}
- for(Map.Entry<String, Map<String, Integer>> test:
NEG_TEST_CONFIGS.entrySet()) {
+ for (Map.Entry<String, Map<String, Integer>> test :
NEG_TEST_CONFIGS.entrySet()) {
String testName = test.getKey();
Config conf = ConfigFactory.parseMap(test.getValue());
try {
new NewestKSelectionPolicy(conf);
Assert.fail("Exception expected for test " + testName);
- }
- catch (RuntimeException e) {
+ } catch (RuntimeException e) {
// OK
}
}
@@ -170,47 +161,42 @@ public class NewestKSelectionPolicyTest {
//selectedVersions 5 < 10
Config conf = ConfigFactory.empty()
- .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
- ConfigValueFactory.fromAnyRef(5));
+ .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
ConfigValueFactory.fromAnyRef(5));
NewestKSelectionPolicy policy = new NewestKSelectionPolicy(conf);
Collection<DatasetVersion> res = policy.listSelectedVersions(versions);
int idx = 0;
Assert.assertEquals(res.size(), policy.getVersionsSelected());
- for (DatasetVersion v: res) {
+ for (DatasetVersion v : res) {
Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
}
//selectedVersions 15 > 10
conf = ConfigFactory.empty()
- .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
- ConfigValueFactory.fromAnyRef(15));
+ .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_SELECTED_KEY,
ConfigValueFactory.fromAnyRef(15));
policy = new NewestKSelectionPolicy(conf);
res = policy.listSelectedVersions(versions);
idx = 0;
Assert.assertEquals(res.size(), versions.size());
- for (DatasetVersion v: res) {
+ for (DatasetVersion v : res) {
Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
}
//notSelectedVersions 4 < 10
conf = ConfigFactory.empty()
- .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
- ConfigValueFactory.fromAnyRef(4));
+ .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
ConfigValueFactory.fromAnyRef(4));
policy = new NewestKSelectionPolicy(conf);
res = policy.listSelectedVersions(versions);
idx = policy.getVersionsSelected();
Assert.assertEquals(res.size(), versions.size() -
policy.getVersionsSelected());
- for (DatasetVersion v: res) {
+ for (DatasetVersion v : res) {
Assert.assertEquals(v, versions.get(idx++), "Mismatch for index " + idx);
}
//notSelectedVersions 14 > 10
conf = ConfigFactory.empty()
- .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
- ConfigValueFactory.fromAnyRef(14));
+ .withValue(NewestKSelectionPolicy.NEWEST_K_VERSIONS_NOTSELECTED_KEY,
ConfigValueFactory.fromAnyRef(14));
policy = new NewestKSelectionPolicy(conf);
res = policy.listSelectedVersions(versions);
Assert.assertEquals(res.size(), 0);
}
-
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
index 4d3fc3b04..58f8a3976 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableDatasetBaseTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -45,8 +46,8 @@ import org.apache.gobblin.data.management.trash.Trash;
public class CleanableDatasetBaseTest {
@Test
- public void test() throws IOException {
-
+ public void test()
+ throws IOException {
FileSystem fs = mock(FileSystem.class);
Path datasetRoot = new Path("/test/dataset");
@@ -59,19 +60,21 @@ public class CleanableDatasetBaseTest {
DatasetImpl dataset = new DatasetImpl(fs, false, false, false, false,
datasetRoot);
- when(dataset.versionFinder.findDatasetVersions(dataset)).
- thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(false);
+
+ when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+ Lists.newArrayList(dataset1Version1, dataset1Version2));
dataset.clean();
Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
Assert.assertTrue(dataset.getTrash().getDeleteOperations().get(0).getPath()
.equals(dataset1Version2.getPathsToDelete().iterator().next()));
-
}
@Test
- public void testSkipTrash() throws IOException {
+ public void testSkipTrash()
+ throws IOException {
FileSystem fs = mock(FileSystem.class);
Trash trash = mock(Trash.class);
@@ -86,8 +89,8 @@ public class CleanableDatasetBaseTest {
when(fs.exists(any(Path.class))).thenReturn(true);
DatasetImpl dataset = new DatasetImpl(fs, false, true, false, false,
datasetRoot);
- when(dataset.versionFinder.findDatasetVersions(dataset)).
- thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+ when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+ Lists.newArrayList(dataset1Version1, dataset1Version2));
dataset.clean();
@@ -96,15 +99,14 @@ public class CleanableDatasetBaseTest {
.equals(dataset1Version2.getPathsToDelete().iterator().next()));
Assert.assertTrue(dataset.getTrash().isSkipTrash());
-
}
@Test
- public void testSimulate() throws IOException {
+ public void testSimulate()
+ throws IOException {
FileSystem fs = mock(FileSystem.class);
Trash trash = mock(Trash.class);
-
Path datasetRoot = new Path("/test/dataset");
DatasetVersion dataset1Version1 = new StringDatasetVersion("version1", new
Path(datasetRoot, "version1"));
@@ -115,8 +117,8 @@ public class CleanableDatasetBaseTest {
when(fs.exists(any(Path.class))).thenReturn(true);
DatasetImpl dataset = new DatasetImpl(fs, true, false, false, false,
datasetRoot);
- when(dataset.versionFinder.findDatasetVersions(dataset)).
- thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
+ when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+ Lists.newArrayList(dataset1Version1, dataset1Version2));
dataset.clean();
@@ -125,11 +127,11 @@ public class CleanableDatasetBaseTest {
.equals(dataset1Version2.getPathsToDelete().iterator().next()));
Assert.assertTrue(dataset.getTrash().isSimulate());
-
}
@Test
- public void testDeleteEmptyDirectories() throws IOException {
+ public void testDeleteEmptyDirectories()
+ throws IOException {
FileSystem fs = mock(FileSystem.class);
Trash trash = mock(Trash.class);
@@ -142,12 +144,9 @@ public class CleanableDatasetBaseTest {
when(trash.moveToTrash(any(Path.class))).thenReturn(true);
when(fs.exists(any(Path.class))).thenReturn(true);
DatasetImpl dataset = new DatasetImpl(fs, false, false, true, false,
datasetRoot);
-
- when(dataset.versionFinder.findDatasetVersions(dataset)).
- thenReturn(Lists.newArrayList(dataset1Version1, dataset1Version2));
-
when(fs.listStatus(any(Path.class))).thenReturn(new FileStatus[]{});
-
+ when(dataset.versionFinder.findDatasetVersions(dataset)).thenReturn(
+ Lists.newArrayList(dataset1Version1, dataset1Version2));
dataset.clean();
Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
@@ -158,7 +157,40 @@ public class CleanableDatasetBaseTest {
verify(fs).delete(dataset1Version2.getPathsToDelete().iterator().next().getParent(),
false);
verify(fs, times(1)).delete(any(Path.class), eq(false));
verify(fs, never()).delete(any(Path.class), eq(true));
+ }
+
+ @Test
+ public void testCleanIteratorFinder()
+ throws IOException {
+ FileSystem fs = mock(FileSystem.class);
+
+ Path datasetRoot = new Path("/test/dataset");
+
+ DatasetVersion dataset1Version1 = new StringDatasetVersion("version1", new
Path(datasetRoot, "version1"));
+ DatasetVersion dataset1Version2 = new StringDatasetVersion("version2", new
Path(datasetRoot, "version2"));
+
+ when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+ when(fs.exists(any(Path.class))).thenReturn(true);
+
+ DatasetImpl dataset = new DatasetImpl(fs, false, false, false, false,
datasetRoot);
+
+
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(false);
+
when(dataset.versionFinder.useIteratorForFindingVersions()).thenReturn(true);
+ mockDatasetVersion(dataset, dataset1Version1, dataset1Version2);
+ dataset.clean();
+
+ Assert.assertEquals(dataset.getTrash().getDeleteOperations().size(), 1);
+ Assert.assertTrue(dataset.getTrash().getDeleteOperations().get(0).getPath()
+ .equals(dataset1Version2.getPathsToDelete().iterator().next()));
+ }
+
+ private void mockDatasetVersion(DatasetImpl dataset, DatasetVersion
dataset1Version1, DatasetVersion dataset1Version2)
+ throws IOException {
+ RemoteIterator<DatasetVersion> versionRemoteIterator =
mock(RemoteIterator.class);
+
when(dataset.versionFinder.findDatasetVersion(dataset)).thenReturn(versionRemoteIterator);
+ when(versionRemoteIterator.hasNext()).thenReturn(true, true, false);
+
when(versionRemoteIterator.next()).thenReturn(dataset1Version1).thenReturn(dataset1Version2);
}
private class DeleteFirstRetentionPolicy implements
RetentionPolicy<StringDatasetVersion> {
@@ -179,8 +211,9 @@ public class CleanableDatasetBaseTest {
public RetentionPolicy retentionPolicy = new DeleteFirstRetentionPolicy();
public Path path;
- public DatasetImpl(FileSystem fs, boolean simulate, boolean skipTrash,
- boolean deleteEmptyDirectories, boolean deleteAsOwner, Path path)
throws IOException {
+ public DatasetImpl(FileSystem fs, boolean simulate, boolean skipTrash,
boolean deleteEmptyDirectories,
+ boolean deleteAsOwner, Path path)
+ throws IOException {
super(fs, TestTrash.propertiesForTestTrash(), simulate, skipTrash,
deleteEmptyDirectories, deleteAsOwner,
LoggerFactory.getLogger(DatasetImpl.class));
when(versionFinder.versionClass()).thenReturn(StringDatasetVersion.class);
@@ -206,5 +239,4 @@ public class CleanableDatasetBaseTest {
return (TestTrash) this.trash;
}
}
-
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
index 68c25f4f1..f5d99997c 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/integration/RetentionIntegrationTest.java
@@ -97,7 +97,9 @@ public class RetentionIntegrationTest {
{ "testCombinePolicy", "retention.job" },
{ "testCombinePolicy", "selection.conf" },
{ "testTimeBasedAccessControl", "selection.conf" },
- { "testMultiVersionAccessControl",
"daily-retention-with-accessControl.conf" }
+ { "testMultiVersionAccessControl",
"daily-retention-with-accessControl.conf" },
+ { "testTimeBasedRetentionWithVersionIterator", "retention.conf"},
+ { "testTimeBasedRetentionWithVersionIterator", "selection.conf" }
};
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
new file mode 100644
index 000000000..48f39ed16
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/version/finder/AbstractDatasetVersionFinderTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.version.finder;
+
+import com.typesafe.config.ConfigFactory;
+
+import java.io.IOException;
+
+import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+@Test(groups = {"gobblin.data.management.version"})
+public class AbstractDatasetVersionFinderTest {
+
+ private FileSystem fs = Mockito.mock(FileSystem.class);
+
+ private AbstractDatasetVersionFinder _abstractDatasetVersionFinder =
+ new GlobModTimeDatasetVersionFinder(fs, ConfigFactory.empty());
+
+ @Test
+ void testGetDatasetVersionIterator()
+ throws IOException {
+ Path root = new Path("/root");
+ String globPattern = "/root/.*/.*";
+
+ FileStatus dir1 = new FileStatus(0, true, 1, 0, 123141, new
Path("/root/dir1"));
+ FileStatus dir2 = new FileStatus(0, true, 1, 0, 124124, new
Path("/root/dir1/dir2"));
+
+ RemoteIterator<FileStatus> rootIterator =
Mockito.mock(RemoteIterator.class);
+ when(fs.listStatusIterator(root)).thenReturn(rootIterator);
+ when(rootIterator.hasNext()).thenReturn(true, false);
+ when(rootIterator.next()).thenReturn(dir1);
+
+ RemoteIterator<FileStatus> dir1Iterator =
Mockito.mock(RemoteIterator.class);
+ when(fs.listStatusIterator(dir1.getPath())).thenReturn(dir1Iterator);
+ when(dir1Iterator.hasNext()).thenReturn(true, false);
+ when(dir1Iterator.next()).thenReturn(dir2);
+
+ RemoteIterator<FileStatus> dir2Iterator =
Mockito.mock(RemoteIterator.class);
+ when(fs.listStatusIterator(dir2.getPath())).thenReturn(dir2Iterator);
+ when(dir2Iterator.hasNext()).thenReturn(false);
+
+ when(fs.getFileStatus(dir1.getPath())).thenReturn(dir1);
+ when(fs.getFileStatus(dir2.getPath())).thenReturn(dir2);
+
+ RemoteIterator<TimestampedDatasetVersion> resultIterator =
+ _abstractDatasetVersionFinder.getDatasetVersionIterator(root,
globPattern);
+
+ // Verify result
+ Assert.assertTrue(resultIterator.hasNext());
+ Assert.assertEquals(dir2.getPath(), resultIterator.next().getPath());
+ Assert.assertFalse(resultIterator.hasNext());
+
+ // Verify interactions with mocks
+ verify(fs, times(1)).listStatusIterator(root);
+ verify(rootIterator, times(2)).hasNext();
+ verify(rootIterator, times(1)).next();
+ }
+}
diff --git
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
new file mode 100644
index 000000000..1a8c998c2
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/retention.conf
@@ -0,0 +1,10 @@
+gobblin.retention : {
+ # ${testNameTempPath} is resolved at runtime by the test
+ dataset.pattern=${testNameTempPath}"/user/gobblin/*"
+ timebased.duration = P7D
+
+
dataset.finder.class=org.apache.gobblin.data.management.retention.profile.ManagedCleanableDatasetFinder
+
retention.policy.class=org.apache.gobblin.data.management.retention.policy.TimeBasedRetentionPolicy
+
version.finder.class=org.apache.gobblin.data.management.retention.version.finder.GlobModTimeDatasetVersionFinder
+ version.finder.iterator=true
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
new file mode 100644
index 000000000..69ded90c4
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/selection.conf
@@ -0,0 +1,18 @@
+gobblin.retention : {
+
+ dataset : {
+ # ${testNameTempPath} is resolved at runtime by the test
+ pattern=${testNameTempPath}"/user/gobblin/*"
+
finder.class=org.apache.gobblin.data.management.retention.profile.ManagedCleanableDatasetFinder
+ }
+
+ selection : {
+
policy.class=org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy
+ timeBased.lookbackTime=7d
+ }
+
+ version : {
+
finder.class=org.apache.gobblin.data.management.version.finder.GlobModTimeDatasetVersionFinder
+ finder.iterator=true
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
new file mode 100644
index 000000000..0f2e08cdb
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/retentionIntegrationTest/testTimeBasedRetentionWithVersionIterator/setup_validate.conf
@@ -0,0 +1,22 @@
+gobblin.test : {
+ currentTime : "02/19/2016 11:00:00"
+ create : [
+ {path:"/user/gobblin/Dataset1/Version1", modTime:"02/10/2016 10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version2", modTime:"02/11/2016 10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version3", modTime:"02/12/2016 10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version4", modTime:"02/13/2016 10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version5", modTime:"02/14/2016 10:00:00"}
+ ]
+
+ validate : {
+ retained : [
+ {path:"/user/gobblin/Dataset1/Version4", modTime:"02/13/2016
10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version5", modTime:"02/14/2016 10:00:00"}
+ ]
+ deleted : [
+ {path:"/user/gobblin/Dataset1/Version1", modTime:"02/10/2016
10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version2", modTime:"02/11/2016
10:00:00"},
+ {path:"/user/gobblin/Dataset1/Version3", modTime:"02/12/2016 10:00:00"}
+ ]
+ }
+}
\ No newline at end of file