Repository: falcon Updated Branches: refs/heads/0.8 71806b96f -> 660711f83
FALCON-1524 Improve Lifecycle Retention validation checks. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/660711f8 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/660711f8 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/660711f8 Branch: refs/heads/0.8 Commit: 660711f831a78dfb2eea2dc7e7bce0bc300947ca Parents: 71806b9 Author: Ajay Yadava <[email protected]> Authored: Tue Oct 13 13:32:58 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Oct 13 14:07:21 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/entity/parser/FeedEntityParser.java | 11 +++++- .../lifecycle/retention/AgeBasedDelete.java | 16 ++++++++ .../java/org/apache/falcon/util/DateUtil.java | 31 ++++++++++++++++ .../entity/parser/FeedEntityParserTest.java | 39 ++++++++++++++++++++ 5 files changed, 98 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/660711f8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e8486f..2dc7e3c 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -21,6 +21,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava) + FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) FALCON-1527 Release Falcon Unit test jar(Pavan Kumar Kolamuri via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/660711f8/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index 6be2495..c5cfdd2 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -47,8 +47,8 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.group.FeedGroup; import org.apache.falcon.group.FeedGroupMap; -import org.apache.falcon.util.DateUtil; import org.apache.falcon.service.LifecyclePolicyMap; +import org.apache.falcon.util.DateUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AuthorizationException; import org.slf4j.Logger; @@ -133,6 +133,7 @@ public class FeedEntityParser extends EntityParser<Feed> { throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: " + cluster.getName()); } + validateRetentionFrequency(feed, cluster.getName()); for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) { map.get(policyName).validate(feed, cluster.getName()); } @@ -140,6 +141,14 @@ public class FeedEntityParser extends EntityParser<Feed> { } } + private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException { + Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName); + Frequency feedFrequency = feed.getFrequency(); + if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) { + throw new ValidationException("Retention can not be more frequent than data availability."); + } + } + private Set<Process> findProcesses(Set<Entity> referenced) { Set<Process> processes = new HashSet<Process>(); for (Entity entity : referenced) { http://git-wip-us.apache.org/repos/asf/falcon/blob/660711f8/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java index a4ae780..ccb0290 100644 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java +++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java @@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.util.StartupProperties; import java.util.Date; @@ -48,6 +49,21 @@ public class AgeBasedDelete extends RetentionPolicy { if (cluster != null) { validateLimitWithSla(feed, cluster, retentionLimit.toString()); validateLimitWithLateData(feed, cluster, retentionLimit.toString()); + String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl", + "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory"); + if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) { + validateRetentionFrequencyForOozie(feed, clusterName); + } + } + } + + + private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException { + // retention shouldn't be more frequent than hours(1) for Oozie Builders. + Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName); + if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes + && retentionFrequency.getFrequencyAsInt() < 60) { + throw new ValidationException("Feed Retention can not be more frequent than hours(1)"); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/660711f8/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index 82163cc..b70fa20 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -18,6 +18,7 @@ package org.apache.falcon.util; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.Frequency; import java.util.Calendar; import java.util.Date; @@ -28,6 +29,11 @@ import java.util.TimeZone; */ public final class DateUtil { + private static final long MINUTE_IN_MS = 60 * 1000L; + private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS; + private static final long DAY_IN_MS = 24 * HOUR_IN_MS; + private static final long MONTH_IN_MS = 31 * DAY_IN_MS; + //Friday, April 16, 9999 7:12:55 AM UTC corresponding date public static final Date NEVER = new Date(Long.parseLong("253379862775000")); @@ -45,6 +51,31 @@ public final class DateUtil { public static String getDateFormatFromTime(long milliSeconds) { return SchemaHelper.getDateFormat().format((new Date(milliSeconds))); + } + + /** + * This function should not be used for scheduling related functions as it may cause correctness issues in those + * scenarios. + * @param frequency + * @return + */ + public static Long getFrequencyInMillis(Frequency frequency){ + switch (frequency.getTimeUnit()) { + + case months: + return MONTH_IN_MS * frequency.getFrequencyAsInt(); + + case days: + return DAY_IN_MS * frequency.getFrequencyAsInt(); + + case hours: + return HOUR_IN_MS * frequency.getFrequencyAsInt(); + + case minutes: + return MINUTE_IN_MS * frequency.getFrequencyAsInt(); + default: + return null; + } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/660711f8/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java index 1c43800..905be68 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java @@ -183,6 +183,45 @@ public class FeedEntityParserTest extends AbstractTestBase { parser.validate(feed); } + @Test + public void testValidRetentionFrequency() throws Exception { + Feed feed = parser.parseAndValidate(this.getClass() + .getResourceAsStream(FEED3_XML)); + + feed.setFrequency(Frequency.fromString("minutes(30)")); + Frequency frequency = Frequency.fromString("minutes(60)"); + feed.getLifecycle().getRetentionStage().setFrequency(frequency); + parser.validate(feed); // no validation exception should be thrown + + frequency = Frequency.fromString("hours(1)"); + feed.getLifecycle().getRetentionStage().setFrequency(frequency); + parser.validate(feed); // no validation exception should be thrown + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = ".*Retention can not be more frequent than data availability.*") + public void testRetentionFrequentThanFeed() throws Exception { + Feed feed = parser.parseAndValidate(this.getClass() + .getResourceAsStream(FEED3_XML)); + + feed.setFrequency(Frequency.fromString("hours(2)")); + Frequency frequency = Frequency.fromString("minutes(60)"); + feed.getLifecycle().getRetentionStage().setFrequency(frequency); + parser.validate(feed); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = ".*Feed Retention can not be more frequent than.*") + public void testRetentionFrequency() throws Exception { + Feed feed = parser.parseAndValidate(this.getClass() + .getResourceAsStream(FEED3_XML)); + + feed.setFrequency(Frequency.fromString("minutes(30)")); + Frequency frequency = Frequency.fromString("minutes(59)"); + feed.getLifecycle().getRetentionStage().setFrequency(frequency); + parser.validate(feed); + } + @Test(expectedExceptions = ValidationException.class) public void applyValidationInvalidFeed() throws Exception { Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class
