Repository: falcon Updated Branches: refs/heads/master 6e50f31ad -> e4c635c2c
FALCON-1656 Improve FeedHelper:getRetentionFrequency method. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e4c635c2 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e4c635c2 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e4c635c2 Branch: refs/heads/master Commit: e4c635c2c6ea454d014eb72a1b199ef6b92036fd Parents: 6e50f31 Author: Ajay Yadava <[email protected]> Authored: Thu Dec 10 17:13:42 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Dec 10 17:13:42 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/FeedHelper.java | 22 ++++---- .../falcon/entity/parser/FeedEntityParser.java | 2 +- .../lifecycle/retention/AgeBasedDelete.java | 2 +- .../apache/falcon/entity/FeedHelperTest.java | 56 ++++++++++++-------- .../retention/AgeBasedCoordinatorBuilder.java | 2 +- 6 files changed, 52 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9ea8c79..63695ac 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1656 Improve FeedHelper:getRetentionFrequency method(Ajay Yadava) + FALCON-1616 Consume Workflow job end notifications for SLA monitoring(Sandeep Samudrala via Ajay Yadava) FALCON-1634 Add .reviewboardrc file so that review requests can be created using just command line(Rajat Khandelwal via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index d601c5d..29daff3 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -993,18 +993,20 @@ public final class FeedHelper { return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null); } - public static Frequency getRetentionFrequency(Feed feed, String clusterName) throws FalconException { - Frequency retentionFrequency; + public static Frequency getLifecycleRetentionFrequency(Feed feed, String clusterName) throws FalconException { + Frequency retentionFrequency = null; RetentionStage retentionStage = getRetentionStage(feed, clusterName); - if (retentionStage != null && retentionStage.getFrequency() != null) { - retentionFrequency = retentionStage.getFrequency(); - } else { - Frequency feedFrequency = feed.getFrequency(); - Frequency defaultFrequency = new Frequency("hours(6)"); - if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) { - retentionFrequency = defaultFrequency; + if (retentionStage != null) { + if (retentionStage.getFrequency() != null) { + retentionFrequency = retentionStage.getFrequency(); } else { - retentionFrequency = new Frequency(feedFrequency.toString()); + Frequency feedFrequency = feed.getFrequency(); + Frequency defaultFrequency = new Frequency("hours(6)"); + if (DateUtil.getFrequencyInMillis(feedFrequency) < DateUtil.getFrequencyInMillis(defaultFrequency)) { + retentionFrequency = defaultFrequency; + } else { + retentionFrequency = new Frequency(feedFrequency.toString()); + } } } return retentionFrequency; http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index c70f18d..0b48e66 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -152,7 +152,7 @@ public class FeedEntityParser extends EntityParser<Feed> { } private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException { - Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName); + Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName); Frequency feedFrequency = feed.getFrequency(); if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) { throw new ValidationException("Retention can not be more frequent than data availability."); http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java index ccb0290..8d735f9 100644 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java +++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java @@ -60,7 +60,7 @@ public class AgeBasedDelete extends RetentionPolicy { private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException { // retention shouldn't be more frequent than hours(1) for Oozie Builders. - Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName); + Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName); if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes && retentionFrequency.getFrequencyAsInt() < 60) { throw new ValidationException("Feed Retention can not be more frequent than hours(1)"); http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index 3e422fc..98cdf6b 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -745,6 +745,7 @@ public class FeedHelperTest extends AbstractTestBase { Assert.assertEquals(result, expected); } + @Test public void testIsLifeCycleEnabled() throws Exception { Feed feed = new Feed(); @@ -782,23 +783,26 @@ public class FeedHelperTest extends AbstractTestBase { Feed feed = new Feed(); feed.setFrequency(new Frequency("days(1)")); - // lifecycle is not defined + // retention stage frequency is not defined + Lifecycle globalLifecycle = new Lifecycle(); + RetentionStage globalRetentionStage = new RetentionStage(); + globalLifecycle.setRetentionStage(globalRetentionStage); + feed.setLifecycle(globalLifecycle); + Clusters clusters = new Clusters(); org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); cluster.setName("cluster1"); clusters.getClusters().add(cluster); feed.setClusters(clusters); - Assert.assertNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(1)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("days(1)")); - // lifecycle is defined at global level - Lifecycle globalLifecycle = new Lifecycle(); - RetentionStage globalRetentionStage = new RetentionStage(); + // lifecycle is defined only at global level globalRetentionStage.setFrequency(new Frequency("hours(2)")); globalLifecycle.setRetentionStage(globalRetentionStage); feed.setLifecycle(globalLifecycle); Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), feed.getLifecycle().getRetentionStage().getFrequency()); // lifecycle is defined at both global and cluster level @@ -808,27 +812,27 @@ public class FeedHelperTest extends AbstractTestBase { clusterLifecycle.setRetentionStage(clusterRetentionStage); feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), cluster.getLifecycle().getRetentionStage().getFrequency()); // lifecycle at both level - retention only at cluster level. feed.getLifecycle().setRetentionStage(null); Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), cluster.getLifecycle().getRetentionStage().getFrequency()); // lifecycle at both level - retention only at global level. feed.getLifecycle().setRetentionStage(globalRetentionStage); feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null); Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), feed.getLifecycle().getRetentionStage().getFrequency()); // lifecycle is defined only at cluster level feed.setLifecycle(null); feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage); Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), cluster.getLifecycle().getRetentionStage().getFrequency()); } @@ -837,30 +841,38 @@ public class FeedHelperTest extends AbstractTestBase { Feed feed = new Feed(); feed.setFrequency(new Frequency("days(10)")); - // no lifecycle defined - test both daily and monthly feeds + // no retention stage frequency defined - test both daily and monthly feeds + Lifecycle globalLifecycle = new Lifecycle(); + RetentionStage globalRetentionStage = new RetentionStage(); + globalLifecycle.setRetentionStage(globalRetentionStage); + feed.setLifecycle(globalLifecycle); + Clusters clusters = new Clusters(); org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); cluster.setName("cluster1"); clusters.getClusters().add(cluster); feed.setClusters(clusters); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(10)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("days(10)")); feed.setFrequency(new Frequency("hours(1)")); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("hours(6)")); feed.setFrequency(new Frequency("minutes(10)")); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("hours(6)")); feed.setFrequency(new Frequency("hours(7)")); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(7)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("hours(7)")); feed.setFrequency(new Frequency("days(2)")); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(2)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("days(2)")); // lifecycle at both level - retention only at global level. feed.setFrequency(new Frequency("hours(1)")); - Lifecycle globalLifecycle = new Lifecycle(); - RetentionStage globalRetentionStage = new RetentionStage(); globalRetentionStage.setFrequency(new Frequency("hours(2)")); globalLifecycle.setRetentionStage(globalRetentionStage); feed.setLifecycle(globalLifecycle); @@ -869,12 +881,14 @@ public class FeedHelperTest extends AbstractTestBase { RetentionStage clusterRetentionStage = new RetentionStage(); clusterLifecycle.setRetentionStage(clusterRetentionStage); feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("hours(6)")); // lifecycle at both level - retention only at cluster level. feed.getLifecycle().getRetentionStage().setFrequency(null); clusterRetentionStage.setFrequency(new Frequency("hours(4)")); - Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(4)")); + Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), + new Frequency("hours(4)")); } @Test http://git-wip-us.apache.org/repos/asf/falcon/blob/e4c635c2/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java index e3e7fec..47ab2fc 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java @@ -77,7 +77,7 @@ public final class AgeBasedCoordinatorBuilder { coord.setStart(SchemaHelper.formatDateUTC(new Date())); coord.setTimezone(feed.getTimezone().getID()); - Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, cluster.getName()); + Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()); // set controls long frequencyInMillis = ExpressionHelper.get().evaluate(retentionFrequency.toString(), Long.class); CONTROLS controls = new CONTROLS();
