Repository: falcon Updated Branches: refs/heads/master 4783a19c4 -> 9872ce8e7
FALCON-1476 Maintaining threshold on monitoring entities for SLA service. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9872ce8e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9872ce8e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9872ce8e Branch: refs/heads/master Commit: 9872ce8e793f0d16ade52cb06e95bef50339e25a Parents: 4783a19 Author: Ajay Yadava <[email protected]> Authored: Thu Oct 1 16:22:30 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Oct 1 16:22:30 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + common/src/main/resources/startup.properties | 4 ++ .../service/FeedSLAMonitoringService.java | 62 +++++++++++++++----- .../falcon/service/FeedSLAMonitoringTest.java | 4 +- src/conf/startup.properties | 4 ++ 5 files changed, 61 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b2104f4..7cc3efb 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1476 Maintaining threshold on monitoring entities for SLA service(Ajay Yadava) + FALCON-592 Refactor FalconCLI to make it more manageable(Balu Vellanki via Ajay Yadava) FALCON-1472 Improvements in SLA service(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index e853c5a..1da7d23 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -104,6 +104,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 +# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in +# a FIFO fashion. +*.feed.sla.queue.size=288 + # Do not change unless really sure # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 *.feed.sla.statusCheck.frequency.seconds=600 http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 193aa64..b4e0427 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -52,7 +52,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -76,6 +78,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return SERVICE; } + private int queueSize; + /** * Permissions for storePath. */ @@ -91,7 +95,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * Map<Pair<feedName, clusterName>, Set<instanceTime> to store * each missing instance of a feed. */ - private Map<Pair<String, String>, Set<Date>> pendingInstances; + private Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances; /** @@ -206,6 +210,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.valueOf(freq); + String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288"); + queueSize = Integer.valueOf(size); + try { if (fileSystem.exists(filePath)) { deserialize(filePath); @@ -270,23 +277,31 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen void addNewPendingFeedInstances(Date from, Date to) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); for (String feedName : monitoredFeeds) { - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); for (Cluster feedCluster : feed.getClusters().getClusters()) { if (currentClusters.contains(feedCluster.getName())) { Date nextInstanceTime = from; Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName()); - Set<Date> instances = pendingInstances.get(key); + BlockingQueue<Date> instances = pendingInstances.get(key); if (instances == null) { - instances = new HashSet<>(); + instances = new LinkedBlockingQueue<>(queueSize); } - + Set<Date> exists = new HashSet<>(instances); org.apache.falcon.entity.v0.cluster.Cluster currentCluster = EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); + nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); while (nextInstanceTime.before(to)) { - nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); - instances.add(nextInstanceTime); + if (instances.size() >= queueSize) { // if no space, first make some space + LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key); + exists.remove(instances.peek()); + instances.remove(); + } + LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key); + if (exists.add(nextInstanceTime)) { + instances.add(nextInstanceTime); + } nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); + nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); } pendingInstances.put(key, instances); } @@ -299,7 +314,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * Checks the availability of all the pendingInstances and removes the ones which have become available. */ private void checkPendingInstanceAvailability() throws FalconException { - for (Map.Entry<Pair<String, String>, Set<Date>> entry: pendingInstances.entrySet()) { + for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) { for (Date date : entry.getValue()) { boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date); if (status) { @@ -358,7 +373,26 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen private void deserialize(Path path) throws FalconException { try { Map<String, Object> state = deserializeInternal(path); - pendingInstances = (ConcurrentHashMap<Pair<String, String>, Set<Date>>) state.get("pendingInstances"); + pendingInstances = new ConcurrentHashMap<>(); + Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy = + (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances"); + // queue size can change during restarts, hence copy + for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) { + BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize); + BlockingQueue<Date> oldValue = entry.getValue(); + LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize); + while (!oldValue.isEmpty()) { + Date instance = oldValue.remove(); + if (value.size() == queueSize) { // if full + LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(), + entry.getKey()); + value.remove(); + } + LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance); + value.add(instance); + } + pendingInstances.put(entry.getKey(), value); + } lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. @@ -397,13 +431,13 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * Start time and end time are both inclusive. * @param start start time, inclusive * @param end end time, inclusive - * @return + * @return Set of pending feed instances belonging to the given range which have missed SLA * @throws FalconException */ public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end) throws FalconException { Set<SchedulableEntityInstance> result = new HashSet<>(); - for (Map.Entry<Pair<String, String>, Set<Date>> feedInstances : pendingInstances.entrySet()) { + for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) { Pair<String, String> feedClusterPair = feedInstances.getKey(); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); @@ -428,7 +462,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen * @param clusterName cluster name * @param start start time, inclusive * @param end end time, inclusive - * @return + * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA. * @throws FalconException */ public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName, @@ -436,7 +470,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Set<SchedulableEntityInstance> result = new HashSet<>(); Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName); - Set<Date> missingInstances = pendingInstances.get(feedClusterPair); + BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); Sla sla = FeedHelper.getSLA(cluster, feed); @@ -452,7 +486,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen return result; } - Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, Set<Date> missingInstances) + Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date> missingInstances) throws FalconException { String tagCritical = "Missed SLA High"; String tagWarn = "Missed SLA Low"; http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index bc03cb5..ca55d01 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -32,6 +32,8 @@ import org.testng.annotations.Test; import java.util.Date; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; /** * Tests for FeedSLAMonitoring Service. @@ -48,7 +50,7 @@ public class FeedSLAMonitoringTest { Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z"); Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z"); - Set<Date> missingInstances = new HashSet<>(); + BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(); missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between http://git-wip-us.apache.org/repos/asf/falcon/blob/9872ce8e/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 37cb044..ee1f141 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -102,6 +102,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 +# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in +# a FIFO fashion. +*.feed.sla.queue.size=288 + # Do not change unless really sure # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 *.feed.sla.statusCheck.frequency.seconds=600
