Repository: falcon Updated Branches: refs/heads/master 97e35874f -> bbfbe087c
FALCON-1616 Consume Workflow job end notifications for SLA monitoring. Contributed by Sandeep Samudrala. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bbfbe087 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bbfbe087 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bbfbe087 Branch: refs/heads/master Commit: bbfbe087cd2812820fc434fb3bdfa33484e779fb Parents: 97e3587 Author: Ajay Yadava <[email protected]> Authored: Tue Dec 8 16:35:24 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Dec 8 16:35:24 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/service/FeedSLAMonitoringService.java | 11 +++++++++-- .../falcon/service/FeedSLAMonitoringTest.java | 17 +++++++++++++++++ src/conf/startup.properties | 2 +- 4 files changed, 29 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bbfbe087/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4690c95..0b1e5bf 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1616 Consume Workflow job end notifications for SLA monitoring(Sandeep Samudrala via Ajay Yadava) + FALCON-1634 Add .reviewboardrc file so that review requests can be created using just command line(Rajat Khandelwal via Ajay Yadava) FALCON-1557 Supporting some Entity Management Api's and admin api in Falcon Unit (Narayan Periwal via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/bbfbe087/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index d4383ec..b302539 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -97,7 +97,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * Map<Pair<feedName, clusterName>, Set<instanceTime> to store * each missing instance of a feed. */ - private Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances; + protected Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances; /** @@ -266,6 +266,13 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen serializeState(); // store the state of monitoring service to the disk. } + public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) { + LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, + clusterName, nominalTime); + Pair<String, String> feedCluster = new Pair<>(feedName, clusterName); + pendingInstances.get(feedCluster).remove(nominalTime); + } + private FileSystem initializeFileSystem() { try { fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); @@ -437,7 +444,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen } } - private void initializeService() { + protected void initializeService() { pendingInstances = new ConcurrentHashMap<>(); lastCheckedAt = new Date(); lastSerializedAt = new Date(); http://git-wip-us.apache.org/repos/asf/falcon/blob/bbfbe087/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index ca55d01..e3dd5cc 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -105,4 +105,21 @@ public class FeedSLAMonitoringTest { AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*"); AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); } + + @Test + public void testMakeFeedInstanceAvailable() { + Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); + Pair<String, String> feedCluster = new Pair<>("testFeed", "testCluster"); + + BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(); + missingInstances.add(instanceDate); + missingInstances.add(nextInstanceDate); + + FeedSLAMonitoringService.get().initializeService(); + FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances); + FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate); + + Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/bbfbe087/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 5ddba01..1694930 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -92,7 +92,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 ##### Workflow Job Execution Completion listeners ##### -*.workflow.execution.listeners= +*.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler ######### Implementation classes #########
