This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bf3357f  [GOBBLIN-1302] add COMBINE_RETENTION_POLICIES config
bf3357f is described below

commit bf3357f7502017d68e974256cdd5b086bdfc8bc5
Author: Arjun <[email protected]>
AuthorDate: Mon Oct 26 15:43:07 2020 -0700

    [GOBBLIN-1302] add COMBINE_RETENTION_POLICIES config
    
    Closes #3140 from
    arjun4084346/combineRetentionPolicyConfigs
---
 .../retention/policy/CombineRetentionPolicy.java   | 50 +++++++++++++-------
 .../retention/policy/DeleteAllRetentionPolicy.java |  2 +
 .../policy/DeleteNothingRetentionPolicy.java       |  2 +
 .../retention/policy/NewestKRetentionPolicy.java   |  2 +
 .../retention/policy/PredicateRetentionPolicy.java |  2 +
 .../retention/policy/TimeBasedRetentionPolicy.java |  2 +
 .../retention/CombineRetentionPolicyTest.java      | 53 ++++++++++++++--------
 .../retention/test/ContainsARetentionPolicy.java   |  3 ++
 .../org/apache/gobblin/util/PropertiesUtils.java   | 14 ++++++
 .../apache/gobblin/util/PropertiesUtilsTest.java   | 11 +++++
 10 files changed, 106 insertions(+), 35 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
index 38c1d9c..9991e5c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/CombineRetentionPolicy.java
@@ -30,7 +30,9 @@ import org.apache.commons.lang3.reflect.ConstructorUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -38,6 +40,8 @@ import com.google.common.collect.Sets;
 
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.PropertiesUtils;
 
 
 /**
@@ -62,10 +66,16 @@ import 
org.apache.gobblin.data.management.version.DatasetVersion;
  */
 public class CombineRetentionPolicy<T extends DatasetVersion> implements 
RetentionPolicy<T> {
 
+  public static final String COMBINE_RETENTION_POLICIES =
+      DatasetCleaner.CONFIGURATION_KEY_PREFIX + 
"combine.retention.policy.classes";
+  /**
+   * @Deprecated , use COMBINE_RETENTION_POLICIES instead.
+   */
   public static final String RETENTION_POLICIES_PREFIX =
       DatasetCleaner.CONFIGURATION_KEY_PREFIX + 
"combine.retention.policy.class.";
   public static final String DELETE_SETS_COMBINE_OPERATION =
       DatasetCleaner.CONFIGURATION_KEY_PREFIX + 
"combine.retention.policy.delete.sets.combine.operation";
+  private static final Splitter COMMA_BASED_SPLITTER = 
Splitter.on(",").omitEmptyStrings().trimResults();
 
   public enum DeletableCombineOperation {
     INTERSECT,
@@ -85,22 +95,7 @@ public class CombineRetentionPolicy<T extends 
DatasetVersion> implements Retenti
   public CombineRetentionPolicy(Properties props) throws IOException {
     
Preconditions.checkArgument(props.containsKey(DELETE_SETS_COMBINE_OPERATION), 
"Combine operation not specified.");
 
-    ImmutableList.Builder<RetentionPolicy<T>> builder = 
ImmutableList.builder();
-
-    for (String property : props.stringPropertyNames()) {
-      if (property.startsWith(RETENTION_POLICIES_PREFIX)) {
-
-        try {
-          builder.add((RetentionPolicy<T>) ConstructorUtils
-              .invokeConstructor(Class.forName(props.getProperty(property)), 
props));
-        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
-            | ClassNotFoundException e) {
-          throw new IllegalArgumentException(e);
-        }
-      }
-    }
-
-    this.retentionPolicies = builder.build();
+    this.retentionPolicies = findRetentionPolicies(props);
     if (this.retentionPolicies.size() == 0) {
       throw new IOException("No retention policies specified for " + 
CombineRetentionPolicy.class.getCanonicalName());
     }
@@ -110,6 +105,29 @@ public class CombineRetentionPolicy<T extends 
DatasetVersion> implements Retenti
 
   }
 
+  private List<RetentionPolicy<T>> findRetentionPolicies(Properties props) {
+    List<String> retentionPolicyClasses;
+    ImmutableList.Builder<RetentionPolicy<T>> builder = 
ImmutableList.builder();
+    ClassAliasResolver<?> aliasResolver = new 
ClassAliasResolver<>(RetentionPolicy.class);
+
+    if (props.containsKey(COMBINE_RETENTION_POLICIES)) {
+      retentionPolicyClasses = 
COMMA_BASED_SPLITTER.splitToList(props.getProperty(COMBINE_RETENTION_POLICIES));
+    } else {
+      retentionPolicyClasses = PropertiesUtils.getValuesAsList(props, 
Optional.of(RETENTION_POLICIES_PREFIX));
+    }
+
+    for (String retentionPolicyClass : retentionPolicyClasses) {
+      try {
+        builder.add((RetentionPolicy<T>) ConstructorUtils.invokeConstructor(
+            Class.forName(aliasResolver.resolve(retentionPolicyClass)), 
props));
+      } catch (ReflectiveOperationException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+
+    return builder.build();
+  }
+
   /**
    * Returns the most specific common superclass for the {@link #versionClass} 
of each embedded policy.
    */
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
index cd4bfdf..e06239d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteAllRetentionPolicy.java
@@ -21,12 +21,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.data.management.retention.version.DatasetVersion;
 
 
 /**
  * Implementation of {@link RetentionPolicy} that marks all {@link 
DatasetVersion}s as deletable.
  */
+@Alias("DeleteAll")
 public class DeleteAllRetentionPolicy implements 
RetentionPolicy<DatasetVersion> {
 
   public DeleteAllRetentionPolicy(Properties properties) {}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
index 12d0be7..10f388e 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/DeleteNothingRetentionPolicy.java
@@ -23,12 +23,14 @@ import java.util.Properties;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.data.management.retention.version.DatasetVersion;
 
 
 /**
  * A {@link RetentionPolicy} that does not delete any versions. Basically a 
pass through dummy policy.
  */
+@Alias("dummy")
 public class DeleteNothingRetentionPolicy implements 
RetentionPolicy<DatasetVersion> {
 
   public DeleteNothingRetentionPolicy(Properties properties) {}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
index cb3ce2b..95ef093 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/NewestKRetentionPolicy.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.data.management.version.DatasetVersion;
 
@@ -34,6 +35,7 @@ import 
org.apache.gobblin.data.management.version.DatasetVersion;
 /**
  * Retains the newest k versions of the dataset.
  */
+@Alias("NewestK")
 public class NewestKRetentionPolicy<T extends DatasetVersion> implements 
RetentionPolicy<T> {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(NewestKRetentionPolicy.class);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
index 35aba33..07796d5 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/PredicateRetentionPolicy.java
@@ -27,6 +27,7 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.data.management.retention.version.DatasetVersion;
 
 
@@ -35,6 +36,7 @@ import 
org.apache.gobblin.data.management.retention.version.DatasetVersion;
  * specified {@link Predicate}. The {@link Predicate} class is determined by 
the key
  * {@link #RETENTION_POLICY_PREDICATE_CLASS}.
  */
+@Alias("PredicateRetention")
 public class PredicateRetentionPolicy implements 
RetentionPolicy<DatasetVersion> {
 
   private final Predicate<DatasetVersion> predicate;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
index 51c8a82..2d568ec 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/policy/TimeBasedRetentionPolicy.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.data.management.version.DatasetVersion;
 import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
@@ -44,6 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * Retain dataset versions newer than now - {@link #retention}.
  */
 @Slf4j
+@Alias("TimeBased")
 public class TimeBasedRetentionPolicy implements 
RetentionPolicy<TimestampedDatasetVersion> {
 
   public static final String RETENTION_MINUTES_KEY = 
DatasetCleaner.CONFIGURATION_KEY_PREFIX + "minutes.retained";
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
index b441ed9..6eabb06 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CombineRetentionPolicyTest.java
@@ -26,18 +26,16 @@ import 
org.apache.gobblin.data.management.version.FileStatusDatasetVersion;
 import org.apache.gobblin.data.management.version.StringDatasetVersion;
 import org.apache.gobblin.data.management.version.TimestampedDatasetVersion;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -45,6 +43,30 @@ import com.google.common.collect.Sets;
 public class CombineRetentionPolicyTest {
 
   @Test
+  public void testConfig() throws IOException {
+    Properties props = new Properties();
+
+    props.setProperty(CombineRetentionPolicy.COMBINE_RETENTION_POLICIES, 
"ContainsA,"
+        + ContainsBRetentionPolicy.class.getCanonicalName() + "," + 
ContainsCRetentionPolicy.class.getCanonicalName());
+    props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
+        CombineRetentionPolicy.DeletableCombineOperation.UNION.name());
+
+    CombineRetentionPolicy<DatasetVersion> policy = new 
CombineRetentionPolicy<>(props);
+
+    Collection<DatasetVersion> deletableVersions = 
policy.listDeletableVersions(Lists
+        .newArrayList(new StringDatasetVersion("a", new Path("/")),
+            new StringDatasetVersion("abc", new Path("/")), new 
StringDatasetVersion("abcd", new Path("/")),
+            new StringDatasetVersion("bc", new Path("/")), new 
StringDatasetVersion("d", new Path("/"))));
+
+    Set<String> actualDeletableVersions =
+        deletableVersions.stream().map(input -> ((StringDatasetVersion) 
input).getVersion()).collect(Collectors.toSet());
+
+    Assert.assertEquals(policy.versionClass(), StringDatasetVersion.class);
+    Assert.assertEquals(deletableVersions.size(), 4);
+    Assert.assertEquals(actualDeletableVersions, Sets.newHashSet("abcd", 
"abc", "a", "bc"));
+  }
+
+  @Test
   public void testIntersect() throws IOException {
     Properties props = new Properties();
 
@@ -57,19 +79,15 @@ public class CombineRetentionPolicyTest {
     props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
         CombineRetentionPolicy.DeletableCombineOperation.INTERSECT.name());
 
-    CombineRetentionPolicy policy = new CombineRetentionPolicy(props);
+    CombineRetentionPolicy<DatasetVersion> policy = new 
CombineRetentionPolicy<>(props);
 
     Collection<DatasetVersion> deletableVersions = 
policy.listDeletableVersions(Lists
-            .<DatasetVersion>newArrayList(new StringDatasetVersion("a", new 
Path("/")),
+            .newArrayList(new StringDatasetVersion("a", new Path("/")),
                 new StringDatasetVersion("abc", new Path("/")), new 
StringDatasetVersion("abcd", new Path("/")),
                 new StringDatasetVersion("bc", new Path("/")), new 
StringDatasetVersion("d", new Path("/"))));
 
-    Set<String> actualDeletableVersions = Sets
-        .newHashSet(Iterables.transform(deletableVersions, new 
Function<DatasetVersion, String>() {
-          @Nullable @Override public String apply(DatasetVersion input) {
-            return ((StringDatasetVersion) input).getVersion();
-          }
-        }));
+    Set<String> actualDeletableVersions =
+        deletableVersions.stream().map(input -> ((StringDatasetVersion) 
input).getVersion()).collect(Collectors.toSet());
 
     Assert.assertEquals(policy.versionClass(), StringDatasetVersion.class);
     Assert.assertEquals(deletableVersions.size(), 2);
@@ -90,19 +108,16 @@ public class CombineRetentionPolicyTest {
     props.setProperty(CombineRetentionPolicy.DELETE_SETS_COMBINE_OPERATION,
         CombineRetentionPolicy.DeletableCombineOperation.UNION.name());
 
-    CombineRetentionPolicy policy = new CombineRetentionPolicy(props);
+    CombineRetentionPolicy<DatasetVersion> policy = new 
CombineRetentionPolicy<>(props);
 
     Collection<DatasetVersion> deletableVersions = 
policy.listDeletableVersions(Lists
-        .<DatasetVersion>newArrayList(new StringDatasetVersion("a", new 
Path("/")),
+        .newArrayList(new StringDatasetVersion("a", new Path("/")),
             new StringDatasetVersion("abc", new Path("/")), new 
StringDatasetVersion("abcd", new Path("/")),
             new StringDatasetVersion("bc", new Path("/")), new 
StringDatasetVersion("d", new Path("/"))));
 
-    Set<String> actualDeletableVersions = Sets
-        .newHashSet(Iterables.transform(deletableVersions, new 
Function<DatasetVersion, String>() {
-          @Nullable @Override public String apply(DatasetVersion input) {
-            return ((StringDatasetVersion) input).getVersion();
-          }
-        }));
+    Set<String> actualDeletableVersions =
+        deletableVersions.stream().map(input -> ((StringDatasetVersion) 
input).getVersion())
+            .collect(Collectors.toSet());
 
     Assert.assertEquals(deletableVersions.size(), 4);
     Assert.assertEquals(actualDeletableVersions, Sets.newHashSet("abcd", 
"abc", "a", "bc"));
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
index 01538d0..6f16e20 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/test/ContainsARetentionPolicy.java
@@ -19,10 +19,13 @@ package org.apache.gobblin.data.management.retention.test;
 
 import java.util.Properties;
 
+import org.apache.gobblin.annotation.Alias;
+
 
 /**
  * RetentionPolivy that deletes versions containing the character "a" in its 
name.
  */
+@Alias("ContainsA")
 public class ContainsARetentionPolicy extends ContainsStringRetentionPolicy {
   public ContainsARetentionPolicy(Properties props) {
     super(props);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 0eb8650..527e250 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -85,6 +85,20 @@ public class PropertiesUtils {
   }
 
   /**
+   * Extract all the values whose keys start with a <code>prefix</code>
+   * @param properties the given {@link Properties} instance
+   * @param prefix of keys to be extracted
+   * @return a list of values in the properties
+   */
+  public static List<String> getValuesAsList(Properties properties, 
Optional<String> prefix) {
+    if (prefix.isPresent()) {
+      properties = extractPropertiesWithPrefix(properties, prefix);
+    }
+    Properties finalProperties = properties;
+    return properties.keySet().stream().map(key -> 
finalProperties.getProperty(key.toString())).collect(Collectors.toList());
+  }
+
+  /**
    * Get the value of a property as a list of strings, using the given default 
value if the property is not set.
    *
    * @param key property key
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 69c95c2..ae2ae80 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -63,4 +63,15 @@ public class PropertiesUtilsTest {
     Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key"), 
ImmutableList.of("1", "2", "3"));
     Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key2", 
"default"), ImmutableList.of("default"));
   }
+
+  @Test
+  public void testGetValuesAsList() {
+    Properties properties = new Properties();
+    properties.put("k1", "v1");
+    properties.put("k2", "v2");
+    properties.put("k3", "v2");
+    properties.put("K3", "v4");
+
+    Assert.assertEqualsNoOrder(PropertiesUtils.getValuesAsList(properties, 
Optional.of("k")).toArray(), new String[]{"v1", "v2", "v2"});
+  }
 }

Reply via email to