Repository: falcon Updated Branches: refs/heads/master 4591ffb61 -> 4b523130f
FALCON-1644 Retention : Some feed instances are never deleted by retention jobs. Contributed by Balu Vellanki. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4b523130 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4b523130 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4b523130 Branch: refs/heads/master Commit: 4b523130fbbcaff9e2ff6e577ceb52e3572aaa3c Parents: 4591ffb Author: Ajay Yadava <[email protected]> Authored: Tue Dec 15 21:54:56 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Dec 15 21:54:56 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../org/apache/falcon/entity/FeedHelper.java | 18 +++++++++++ common/src/main/resources/runtime.properties | 4 +++ docs/src/site/twiki/FalconDocumentation.twiki | 6 +++- .../retention/AgeBasedCoordinatorBuilder.java | 10 +++++- .../feed/FeedRetentionCoordinatorBuilder.java | 8 +++++ .../feed/OozieFeedWorkflowBuilderTest.java | 33 +++++++++++++++----- src/conf/runtime.properties | 4 +++ webapp/src/test/resources/runtime.properties | 4 +++ 9 files changed, 80 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ce346a8..37c5d67 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -74,6 +74,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1644 Retention : Some feed instances are never deleted by retention jobs(Balu Vellanki via Ajay Yadava) + FALCON-1641 Triage on an invalid feed instance throws IndexOutOfBoundException(Karishma Gulati via Ajay Yadava) FALCON-1572 Only one instance is running in a process when run using Native Scheduler(Pallavi Rao via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 138a43f..18d5152 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -38,6 +38,7 @@ import org.apache.falcon.entity.v0.feed.Location; import org.apache.falcon.entity.v0.feed.Locations; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.feed.MergeType; +import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.entity.v0.process.Input; @@ -1030,4 +1031,21 @@ public final class FeedHelper { } return retentionFrequency; } + + public static int getRetentionLimitInSeconds(Feed feed, String clusterName) throws FalconException { + Frequency retentionLimit = new Frequency("minutes(0)"); + RetentionStage retentionStage = getRetentionStage(feed, clusterName); + if (retentionStage != null) { + for (Property property : retentionStage.getProperties().getProperties()) { + if (property.getName().equalsIgnoreCase("retention.policy.agebaseddelete.limit")) { + retentionLimit = new Frequency(property.getValue()); + break; + } + } + } else { + retentionLimit = getCluster(feed, clusterName).getRetention().getLimit(); + } + Long freqInMillis = DateUtil.getFrequencyInMillis(retentionLimit); + return (int) (freqInMillis/1000); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/common/src/main/resources/runtime.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties index f499dd9..643559e 100644 --- a/common/src/main/resources/runtime.properties +++ b/common/src/main/resources/runtime.properties @@ -25,6 +25,10 @@ *.falcon.replication.workflow.mapbandwidth=100 *.webservices.default.results.per.page=10 +# If true, do not run retention past feedCluster validity end time. +# This will retain recent instances beyond feedCluster validity end time. +*.falcon.retention.keep.instances.beyond.validity=true + # Default configs to handle replication for late arriving feeds. *.feed.late.allowed=true *.feed.late.frequency=hours(3) http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/docs/src/site/twiki/FalconDocumentation.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki index 95f388a..21b22ff 100644 --- a/docs/src/site/twiki/FalconDocumentation.twiki +++ b/docs/src/site/twiki/FalconDocumentation.twiki @@ -257,7 +257,11 @@ feed/data xml in the following manner for each cluster the feed can belong to : The 'limit' attribute can be specified in units of minutes/hours/days/months, and a corresponding numeric value can be attached to it. It essentially instructs the system to retain data till the time specified -in the attribute spanning backwards in time, from now. Any data older than that is erased from the system. +in the attribute spanning backwards in time, from now. Any data older than that is erased from the system. By default, +Falcon runs retention jobs up to the cluster validity end time. This causes the instances created within the endTime +and "endTime - retentionLimit" to be retained forever. If the users do not want to retain any instances of the +feed past the cluster validity end time, user should set property "falcon.retention.keep.instances.beyond.validity" +to false in runtime.properties. With the integration of Hive, Falcon also provides retention for tables in Hive catalog. http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java index 47ab2fc..7a25d86 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java @@ -18,6 +18,7 @@ package org.apache.falcon.lifecycle.engine.oozie.retention; +import org.apache.commons.lang3.time.DateUtils; import org.apache.falcon.FalconException; import org.apache.falcon.LifeCycle; import org.apache.falcon.entity.EntityUtil; @@ -33,6 +34,7 @@ import org.apache.falcon.oozie.coordinator.ACTION; import org.apache.falcon.oozie.coordinator.CONTROLS; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.WORKFLOW; +import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -73,7 +75,13 @@ public final class AgeBasedCoordinatorBuilder { COORDINATORAPP coord = new COORDINATORAPP(); String coordName = EntityUtil.getWorkflowName(LifeCycle.EVICTION.getTag(), feed).toString(); coord.setName(coordName); - coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd())); + Date endDate = feedCluster.getValidity().getEnd(); + if (RuntimeProperties.get().getProperty( + "falcon.retention.keep.instances.beyond.validity", "true").equalsIgnoreCase("false")) { + int retentionLimitinSecs = FeedHelper.getRetentionLimitInSeconds(feed, cluster.getName()); + endDate = DateUtils.addSeconds(endDate, retentionLimitinSecs); + } + coord.setEnd(SchemaHelper.formatDateUTC(endDate)); coord.setStart(SchemaHelper.formatDateUTC(new Date())); coord.setTimezone(feed.getTimezone().getID()); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java index 69ca2c3..9f05a09 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java @@ -35,6 +35,7 @@ import org.apache.falcon.oozie.coordinator.ACTION; import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.WORKFLOW; import org.apache.falcon.util.DateUtil; +import org.apache.falcon.util.RuntimeProperties; import org.apache.hadoop.fs.Path; import java.util.Arrays; @@ -59,8 +60,15 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee COORDINATORAPP coord = new COORDINATORAPP(); String coordName = getEntityName(); coord.setName(coordName); + Date endDate = feedCluster.getValidity().getEnd(); + if (RuntimeProperties.get().getProperty( + "falcon.retention.keep.instances.beyond.validity", "true").equalsIgnoreCase("false")) { + int retentionLimitinSecs = FeedHelper.getRetentionLimitInSeconds(entity, cluster.getName()); + endDate = DateUtils.addSeconds(endDate, retentionLimitinSecs); + } coord.setEnd(SchemaHelper.formatDateUTC(endDate)); + if (feedCluster.getValidity().getEnd().before(new Date())) { Date startDate = DateUtils.addMinutes(endDate, -1); coord.setStart(SchemaHelper.formatDateUTC(startDate)); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index d034b1a..9388c68 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.oozie.feed; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; import org.apache.falcon.cluster.util.EmbeddedCluster; @@ -51,6 +52,7 @@ import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.service.LifecyclePolicyMap; +import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowExecutionArgs; @@ -161,21 +163,30 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { trgMiniDFS.shutdown(); } - @Test - public void testRetentionWithLifecycle() throws Exception { + @DataProvider(name = "keepInstancesPostValidity") + private Object[][] keepInstancesPostValidity() { + return new Object[][] { + {"false", "2099-01-01T02:00Z"}, + {"true", "2099-01-01T00:00Z"}, + }; + } + + @Test(dataProvider = "keepInstancesPostValidity") + public void testRetentionWithLifecycle(String keepInstancesPostValidity, String endTime) throws Exception { + RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity", + keepInstancesPostValidity); OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed); Path bundlePath = new Path("/projects/falcon/"); builder.build(trgCluster, bundlePath); BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath); List<COORDINATOR> coords = bundle.getCoordinator(); - COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath()); assertLibExtensions(coord, "retention"); HashMap<String, String> props = getCoordProperties(coord); Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION"); Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}"); - Assert.assertEquals(coord.getEnd(), "2099-01-01T00:00Z"); + Assert.assertEquals(coord.getEnd(), endTime); Assert.assertEquals(coord.getTimezone(), "UTC"); HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); @@ -187,6 +198,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(wfProps.get("jobPriority"), "LOW"); } + @Test public void testRetentionFrequency() throws Exception { feed.setFrequency(new Frequency("minutes(36000)")); @@ -675,9 +687,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName()); - final Calendar instance = Calendar.getInstance(); - instance.roll(Calendar.YEAR, 1); - cluster.getValidity().setEnd(instance.getTime()); + Calendar startCal = Calendar.getInstance(); + Calendar endCal = Calendar.getInstance(); + endCal.roll(Calendar.DATE, 1); + cluster.getValidity().setEnd(endCal.getTime()); + RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity", "false"); OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION); List<Properties> coords = builder.buildCoords( @@ -689,6 +703,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName()); Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}"); + Assert.assertEquals(coord.getStart(), DateUtil.getDateFormatFromTime(startCal.getTimeInMillis())); + Date endDate = DateUtils.addSeconds(endCal.getTime(), + FeedHelper.getRetentionLimitInSeconds(feed, srcCluster.getName())); + Assert.assertEquals(coord.getEnd(), DateUtil.getDateFormatFromTime(endDate.getTime())); + HashMap<String, String> props = getCoordProperties(coord); HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/src/conf/runtime.properties ---------------------------------------------------------------------- diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties index 4bc1bc0..f535b0f 100644 --- a/src/conf/runtime.properties +++ b/src/conf/runtime.properties @@ -41,6 +41,10 @@ falcon.current.colo=local *.feed.late.frequency=hours(3) *.feed.late.policy=exp-backoff +# If true, do not run retention past feedCluster validity end time. +# This will retain recent instances beyond feedCluster validity end time. +*.falcon.retention.keep.instances.beyond.validity=true + # If true, Falcon skips oozie dryrun while scheduling entities. *.falcon.skip.dryrun=false http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/webapp/src/test/resources/runtime.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties index fec9e44..7dec191 100644 --- a/webapp/src/test/resources/runtime.properties +++ b/webapp/src/test/resources/runtime.properties @@ -25,6 +25,10 @@ *.falcon.replication.workflow.mapbandwidth=100 *.webservices.default.results.per.page=10 +# If true, do not run retention past feedCluster validity end time. +# This will retain recent instances beyond feedCluster validity end time. +*.falcon.retention.keep.instances.beyond.validity=true + # Default configs to handle replication for late arriving feeds. *.feed.late.allowed=true *.feed.late.frequency=hours(3)
