Repository: falcon Updated Branches: refs/heads/0.9 f451d548a -> eb598ee99
FALCON-1678 SLA Monitoring does not honour entity end date. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3451f2c8 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3451f2c8 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3451f2c8 Branch: refs/heads/0.9 Commit: 3451f2c8eebcbeebc874430250f8c3a5e820e2f4 Parents: f451d54 Author: Ajay Yadava <[email protected]> Authored: Mon Jan 11 14:42:45 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jan 11 15:23:29 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/falcon/entity/FeedHelper.java | 9 +++ .../falcon/util/DeploymentProperties.java | 2 +- .../apache/falcon/entity/FeedHelperTest.java | 16 ++++ .../service/FeedSLAMonitoringService.java | 7 +- .../falcon/service/FeedSLAMonitoringTest.java | 85 ++++++++++++++++++-- 6 files changed, 110 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 437dc45..94010c3 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -94,6 +94,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1678 SLA Monitoring does not honour entity end date(Ajay Yadava) + FALCON-1708 params API does not take start as a mandatory option(Praveen Adlakha via Ajay Yadava) FALCON-1725 Falcon API shows results in ascending order in native scheduler (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 150e0bd..575ceb3 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -42,6 +42,7 @@ import org.apache.falcon.entity.v0.feed.MergeType; import org.apache.falcon.entity.v0.feed.Property; import org.apache.falcon.entity.v0.feed.RetentionStage; import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.feed.Validity; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; @@ -1128,6 +1129,14 @@ public final class FeedHelper { return argsMap; } + public static Validity getClusterValidity(Feed feed, String clusterName) throws FalconException { + Cluster cluster = getCluster(feed, clusterName); + if (cluster == null) { + throw new FalconException("Invalid cluster: " + clusterName + " for feed: " + feed.getName()); + } + return cluster.getValidity(); + } + public static Frequency getOldRetentionFrequency(Feed feed) { Frequency feedFrequency = feed.getFrequency(); Frequency defaultFrequency = new Frequency("hours(24)"); http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java index 715b7ba..5879f30 100644 --- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java +++ b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java @@ -31,7 +31,7 @@ public final class DeploymentProperties extends ApplicationProperties { private static final String PROPERTY_FILE = "deploy.properties"; private static final AtomicReference<DeploymentProperties> INSTANCE = - new AtomicReference<DeploymentProperties>(); + new AtomicReference<>(); private DeploymentProperties() throws FalconException { super(); http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java index 9841083..d565f94 100644 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java @@ -929,6 +929,22 @@ public class FeedHelperTest extends AbstractTestBase { Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart()); } + @Test + public void testGetFeedClusterValidity() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); + Validity validity = FeedHelper.getClusterValidity(feed, cluster.getName()); + Assert.assertEquals(validity.getStart(), getDate("2012-02-07 00:00 UTC")); + Assert.assertEquals(validity.getEnd(), getDate("2020-02-25 00:00 UTC")); + } + + @Test(expectedExceptions = FalconException.class) + public void testGetClusterValidityInvalidCluster() throws Exception { + Cluster cluster = publishCluster(); + Feed feed = publishFeed(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); + FeedHelper.getClusterValidity(feed, "abracadabra"); + } + private Validity getFeedValidity(String start, String end) throws ParseException { Validity validity = new Validity(); validity.setStart(getDate(start)); http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 1cd571e..8ffecd8 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -80,7 +80,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return SERVICE; } - private int queueSize; + protected int queueSize; /** * Permissions for storePath. @@ -90,7 +90,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen /** * Feeds to be monitored. */ - private Set<String> monitoredFeeds; + protected Set<String> monitoredFeeds; /** @@ -340,7 +340,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen org.apache.falcon.entity.v0.cluster.Cluster currentCluster = EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); - while (nextInstanceTime.before(to)) { + Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd(); + while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { if (instances.size() >= queueSize) { // if no space, first make some space LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key); exists.remove(instances.peek()); http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index e3dd5cc..90eec4d 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -18,27 +18,42 @@ package org.apache.falcon.service; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + import org.apache.falcon.FalconException; import org.apache.falcon.Pair; +import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.feed.Validity; import org.apache.falcon.resource.AbstractSchedulableEntityManager; + import org.testng.Assert; import org.testng.annotations.Test; -import java.util.Date; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * Tests for FeedSLAMonitoring Service. */ -public class FeedSLAMonitoringTest { +public class FeedSLAMonitoringTest extends AbstractTestBase { + private static final String CLUSTER_NAME = "testCluster"; + private static final String FEED_NAME = "testFeed"; + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); @Test public void testSLAStatus() throws FalconException { @@ -122,4 +137,60 @@ public class FeedSLAMonitoringTest { Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); } + + @Test + public void testEndDateCheck() throws Exception { + Cluster cluster = publishCluster(); + publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC"); + Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME); + + FeedSLAMonitoringService service = FeedSLAMonitoringService.get(); + service.initializeService(); + service.queueSize = 100; + service.monitoredFeeds.add(FEED_NAME); + Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z"); + service.addNewPendingFeedInstances(from, to); + // check that instances after feed's end date are not added. + Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5); + } + + private Cluster publishCluster() throws FalconException { + Cluster cluster = new Cluster(); + cluster.setName(CLUSTER_NAME); + cluster.setColo("default"); + getStore().publish(EntityType.CLUSTER, cluster); + return cluster; + + } + + private Feed publishFeed(Cluster cluster, String frequency, String start, String end) + throws FalconException, ParseException { + Feed feed = new Feed(); + feed.setName(FEED_NAME); + Frequency f = new Frequency(frequency); + feed.setFrequency(f); + feed.setTimezone(UTC); + Clusters fClusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + fCluster.setType(ClusterType.SOURCE); + fCluster.setName(cluster.getName()); + fCluster.setValidity(getFeedValidity(start, end)); + fClusters.getClusters().add(fCluster); + feed.setClusters(fClusters); + getStore().publish(EntityType.FEED, feed); + return feed; + } + + private Validity getFeedValidity(String start, String end) throws ParseException { + Validity validity = new Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private Date getDate(String dateString) throws ParseException { + DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); + return format.parse(dateString); + } }
