This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bf3357f [GOBBLIN-1302] add COMBINE_RETENTION_POLICIES config
bf3357f is described below
commit bf3357f7502017d68e974256cdd5b086bdfc8bc5
Author: Arjun <[email protected]>
AuthorDate: Mon Oct 26 15:43:07 2020 -0700
[GOBBLIN-1302] add COMBINE_RETENTION_POLICIES config
Closes #3140 from
arjun4084346/combineRetentionPolicyConfigs
---
.../retention/policy/CombineRetentionPolicy.java | 50 +++++++++++++-------
.../retention/policy/DeleteAllRetentionPolicy.java | 2 +
.../policy/DeleteNothingRetentionPolicy.java | 2 +
.../retention/policy/NewestKRetentionPolicy.java | 2 +
.../retention/policy/PredicateRetentionPolicy.java | 2 +
.../retention/policy/TimeBasedRetentionPolicy.java | 2 +
.../retention/CombineRetentionPolicyTest.java | 53 ++++++++++++++--------
.../retention/test/ContainsARetentionPolicy.java | 3 ++
.../org/apache/gobblin/util/PropertiesUtils.java | 14 ++++++
.../apache/gobblin/util/PropertiesUtilsTest.java | 11 +++++
10 files changed, 106 insertions(+), 35 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
index 38c1d9c..9991e5c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
@@ -30,7 +30,9 @@ import org.apache.commons.lang3.reflect.ConstructorUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -38,6 +40,8 @@ import com.google.common.collect.Sets;
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -62,10 +66,16 @@ import
org.apache.gobblin.data.management.version.DatasetVersion;
*/
public class CombineRetentionPolicy<T extends DatasetVersion> implements
RetentionPolicy<T> {
+ public static final String COMBINE_RETENTION_POLICIES =
+ DatasetCleaner.CONFIGURATION_KEY_PREFIX +
"combine.retention.policy.classes";
+ /**
+ * @Deprecated , use COMBINE_RETENTION_POLICIES instead.
+ */
public static final String RETENTION_POLICIES_PREFIX =
DatasetCleaner.CONFIGURATION_KEY_PREFIX +
"combine.retention.policy.class.";
public static final String DELETE_SETS_COMBINE_OPERATION =
DatasetCleaner.CONFIGURATION_KEY_PREFIX +
"combine.retention.policy.delete.sets.combine.operation";
+ private static final Splitter COMMA_BASED_SPLITTER =
Splitter.on(",").omitEmptyStrings().trimResults();
public enum DeletableCombineOperation {
INTERSECT,
@@ -85,22 +95,7 @@ public class CombineRetentionPolicy<T extends
DatasetVersion> implements Retenti
public CombineRetentionPolicy(Properties props) throws IOException {
Preconditions.checkArgument(props.containsKey(DELETE_SETS_COMBINE_OPERATION),
"Combine operation not specified.");
- ImmutableList.Builder<RetentionPolicy<T>> builder =
ImmutableList.builder();
-
- for (String property : props.stringPropertyNames()) {
- if (property.startsWith(RETENTION_POLICIES_PREFIX)) {
-
- try {
- builder.add((RetentionPolicy<T>) ConstructorUtils
- .invokeConstructor(Class.forName(props.getProperty(property)),
props));
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
- | ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
- }
-
- this.retentionPolicies = builder.build();
+ this.retentionPolicies = findRetentionPolicies(props);
if (this.retentionPolicies.size() == 0) {
throw new IOException("No retention policies specified for " +
CombineRetentionPolicy.class.getCanonicalName());
}
@@ -110,6 +105,29 @@ public class CombineRetentionPolicy<T extends
DatasetVersion> implements Retenti
}
+ private List<RetentionPolicy<T>> findRetentionPolicies(Properties props) {
+ List<String> retentionPolicyClasses;
+ ImmutableList.Builder<RetentionPolicy<T>> builder =
ImmutableList.builder();
+ ClassAliasResolver<?> aliasResolver = new
ClassAliasResolver<>(RetentionPolicy.class);
+
+ if (props.containsKey(COMBINE_RETENTION_POLICIES)) {
+ retentionPolicyClasses =
COMMA_BASED_SPLITTER.splitToList(props.getProperty(COMBINE_RETENTION_POLICIES));
+ } else {
+ retentionPolicyClasses = PropertiesUtils.getValuesAsList(props,
Optional.of(RETENTION_POLICIES_PREFIX));
+ }
+
+ for (String retentionPolicyClass : retentionPolicyClasses) {
+ try {
+ builder.add((RetentionPolicy<T>) ConstructorUtils.invokeConstructor(
+ Class.forName(aliasResolver.resolve(retentionPolicyClass)),
props));
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ return builder.build();
+ }
+
/**
* Returns the most specific common superclass for the {@link #versionClass}
of each embedded policy.
*/
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
index cd4bfdf..e06239d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
@@ -21,12 +21,14 @@ import java.util.Collection;
import java.util.List;
import java.util.Properties;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.data.management.retention.version.DatasetVersion;
/**
* Implementation of {@link RetentionPolicy} that marks all {@link
DatasetVersion}s as deletable.
*/
+@Alias("DeleteAll")
public class DeleteAllRetentionPolicy implements
RetentionPolicy<DatasetVersion> {
public DeleteAllRetentionPolicy(Properties properties) {}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
index 12d0be7..10f388e 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
@@ -23,12 +23,14 @@ import java.util.Properties;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.data.management.retention.version.DatasetVersion;
/**
* A {@link RetentionPolicy} that does not delete any versions. Basically a
pass through dummy policy.
*/
+@Alias("dummy")
public class DeleteNothingRetentionPolicy implements
RetentionPolicy<DatasetVersion> {
public DeleteNothingRetentionPolicy(Properties properties) {}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
index cb3ce2b..95ef093 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import org.apache.gobblin.data.management.version.DatasetVersion;
@@ -34,6 +35,7 @@ import
org.apache.gobblin.data.management.version.DatasetVersion;
/**
* Retains the newest k versions of the dataset.
*/
+@Alias("NewestK")
public class NewestKRetentionPolicy<T extends DatasetVersion> implements
RetentionPolicy<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(NewestKRetentionPolicy.class);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
index 35aba33..07796d5 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
@@ -27,6 +27,7 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.data.management.retention.version.DatasetVersion;
@@ -35,6 +36,7 @@ import
org.apache.gobblin.data.management.retention.version.DatasetVersion;
* specified {@link Predicate}. The {@link Predicate} class is determined by
the key
* {@link #RETENTION_POLICY_PREDICATE_CLASS}.
*/
+@Alias("PredicateRetention")
public class PredicateRetentionPolicy implements
RetentionPolicy<DatasetVersion> {
private final Predicate<DatasetVersion> predicate;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
index 51c8a82..2d568ec 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import org.apache.gobblin.data.management.version.DatasetVersion;
import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
@@ -44,6 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
* Retain dataset versions newer than now - {@link #retention}.
*/
@Slf4j
+@Alias("TimeBased")
public class TimeBasedRetentionPolicy implements
RetentionPolicy<TimestampedDatasetVersion> {
public static final String RETENTION_MINUTES_KEY =
DatasetCleaner.CONFIGURATION_KEY_PREFIX + "minutes.retained";
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
index b441ed9..6eabb06 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
@@ -26,18 +26,16 @@ import
org.apache.gobblin.data.management.version.FileStatusDatasetVersion;
import org.apache.gobblin.data.management.version.StringDatasetVersion;
import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -45,6 +43,30 @@ import com.google.common.collect.Sets;
public class CombineRetentionPolicyTest {
@Test
+ public void testConfig() throws IOException {
+ Properties props = new Properties();
+
+ props.setProperty(CombineRetentionPolicy.COMBINE_RETENTION_POLICIES,
"ContainsA,"
+ + ContainsBRetentionPolicy.class.getCanonicalName() + "," +
ContainsCRetentionPolicy.class.getCanonicalName());
+ props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
+ CombineRetentionPolicy.DeletableCombineOperation.UNION.name());
+
+ CombineRetentionPolicy<DatasetVersion> policy = new
CombineRetentionPolicy<>(props);
+
+ Collection<DatasetVersion> deletableVersions =
policy.listDeletableVersions(Lists
+ .newArrayList(new StringDatasetVersion("a", new Path("/")),
+ new StringDatasetVersion("abc", new Path("/")), new
StringDatasetVersion("abcd", new Path("/")),
+ new StringDatasetVersion("bc", new Path("/")), new
StringDatasetVersion("d", new Path("/"))));
+
+ Set<String> actualDeletableVersions =
+ deletableVersions.stream().map(input -> ((StringDatasetVersion)
input).getVersion()).collect(Collectors.toSet());
+
+ Assert.assertEquals(policy.versionClass(), StringDatasetVersion.class);
+ Assert.assertEquals(deletableVersions.size(), 4);
+ Assert.assertEquals(actualDeletableVersions, Sets.newHashSet("abcd",
"abc", "a", "bc"));
+ }
+
+ @Test
public void testIntersect() throws IOException {
Properties props = new Properties();
@@ -57,19 +79,15 @@ public class CombineRetentionPolicyTest {
props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
CombineRetentionPolicy.DeletableCombineOperation.INTERSECT.name());
- CombineRetentionPolicy policy = new CombineRetentionPolicy(props);
+ CombineRetentionPolicy<DatasetVersion> policy = new
CombineRetentionPolicy<>(props);
Collection<DatasetVersion> deletableVersions =
policy.listDeletableVersions(Lists
- .<DatasetVersion>newArrayList(new StringDatasetVersion("a", new
Path("/")),
+ .newArrayList(new StringDatasetVersion("a", new Path("/")),
new StringDatasetVersion("abc", new Path("/")), new
StringDatasetVersion("abcd", new Path("/")),
new StringDatasetVersion("bc", new Path("/")), new
StringDatasetVersion("d", new Path("/"))));
- Set<String> actualDeletableVersions = Sets
- .newHashSet(Iterables.transform(deletableVersions, new
Function<DatasetVersion, String>() {
- @Nullable @Override public String apply(DatasetVersion input) {
- return ((StringDatasetVersion) input).getVersion();
- }
- }));
+ Set<String> actualDeletableVersions =
+ deletableVersions.stream().map(input -> ((StringDatasetVersion)
input).getVersion()).collect(Collectors.toSet());
Assert.assertEquals(policy.versionClass(), StringDatasetVersion.class);
Assert.assertEquals(deletableVersions.size(), 2);
@@ -90,19 +108,16 @@ public class CombineRetentionPolicyTest {
props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
CombineRetentionPolicy.DeletableCombineOperation.UNION.name());
- CombineRetentionPolicy policy = new CombineRetentionPolicy(props);
+ CombineRetentionPolicy<DatasetVersion> policy = new
CombineRetentionPolicy<>(props);
Collection<DatasetVersion> deletableVersions =
policy.listDeletableVersions(Lists
- .<DatasetVersion>newArrayList(new StringDatasetVersion("a", new
Path("/")),
+ .newArrayList(new StringDatasetVersion("a", new Path("/")),
new StringDatasetVersion("abc", new Path("/")), new
StringDatasetVersion("abcd", new Path("/")),
new StringDatasetVersion("bc", new Path("/")), new
StringDatasetVersion("d", new Path("/"))));
- Set<String> actualDeletableVersions = Sets
- .newHashSet(Iterables.transform(deletableVersions, new
Function<DatasetVersion, String>() {
- @Nullable @Override public String apply(DatasetVersion input) {
- return ((StringDatasetVersion) input).getVersion();
- }
- }));
+ Set<String> actualDeletableVersions =
+ deletableVersions.stream().map(input -> ((StringDatasetVersion)
input).getVersion())
+ .collect(Collectors.toSet());
Assert.assertEquals(deletableVersions.size(), 4);
Assert.assertEquals(actualDeletableVersions, Sets.newHashSet("abcd",
"abc", "a", "bc"));
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
index 01538d0..6f16e20 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
@@ -19,10 +19,13 @@ package org.apache.gobblin.data.management.retention.test;
import java.util.Properties;
+import org.apache.gobblin.annotation.Alias;
+
/**
* RetentionPolivy that deletes versions containing the character "a" in its
name.
*/
+@Alias("ContainsA")
public class ContainsARetentionPolicy extends ContainsStringRetentionPolicy {
public ContainsARetentionPolicy(Properties props) {
super(props);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 0eb8650..527e250 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -85,6 +85,20 @@ public class PropertiesUtils {
}
/**
+ * Extract all the values whose keys start with a <code>prefix</code>
+ * @param properties the given {@link Properties} instance
+ * @param prefix of keys to be extracted
+ * @return a list of values in the properties
+ */
+ public static List<String> getValuesAsList(Properties properties,
Optional<String> prefix) {
+ if (prefix.isPresent()) {
+ properties = extractPropertiesWithPrefix(properties, prefix);
+ }
+ Properties finalProperties = properties;
+ return properties.keySet().stream().map(key ->
finalProperties.getProperty(key.toString())).collect(Collectors.toList());
+ }
+
+ /**
* Get the value of a property as a list of strings, using the given default
value if the property is not set.
*
* @param key property key
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 69c95c2..ae2ae80 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -63,4 +63,15 @@ public class PropertiesUtilsTest {
Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key"),
ImmutableList.of("1", "2", "3"));
Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key2",
"default"), ImmutableList.of("default"));
}
+
+ @Test
+ public void testGetValuesAsList() {
+ Properties properties = new Properties();
+ properties.put("k1", "v1");
+ properties.put("k2", "v2");
+ properties.put("k3", "v2");
+ properties.put("K3", "v4");
+
+ Assert.assertEqualsNoOrder(PropertiesUtils.getValuesAsList(properties,
Optional.of("k")).toArray(), new String[]{"v1", "v2", "v2"});
+ }
}