Repository: falcon Updated Branches: refs/heads/master f7ad3f487 -> 879204671
FALCON-1473 Feed SLA Miss Alerts through REST API. Contributed by Ajay Yadava. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/87920467 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/87920467 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/87920467 Branch: refs/heads/master Commit: 87920467185d895c71c29b4b13445e5593fc2f42 Parents: f7ad3f4 Author: Ajay Yadava <[email protected]> Authored: Tue Sep 29 13:58:15 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Sep 29 13:58:15 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/ResponseHelper.java | 14 ++ .../java/org/apache/falcon/cli/FalconCLI.java | 15 +- .../org/apache/falcon/client/FalconClient.java | 21 +++ .../apache/falcon/entity/v0/SchemaHelper.java | 2 +- .../resource/SchedulableEntityInstance.java | 22 ++- .../SchedulableEntityInstanceResult.java | 86 ++++++++++ .../org/apache/falcon/entity/FeedHelper.java | 2 +- .../falcon/entity/parser/FeedEntityParser.java | 2 +- .../lifecycle/retention/AgeBasedDelete.java | 2 +- common/src/main/resources/startup.properties | 14 ++ docs/src/site/twiki/FalconCLI.twiki | 48 ++++++ docs/src/site/twiki/restapi/FeedSLA.twiki | 56 +++++++ docs/src/site/twiki/restapi/ResourceList.twiki | 1 + .../falcon/resource/AbstractEntityManager.java | 4 +- .../AbstractSchedulableEntityManager.java | 70 +++++++++ .../proxy/SchedulableEntityManagerProxy.java | 37 ++++- .../service/FeedSLAMonitoringService.java | 157 ++++++++++++++++--- .../falcon/service/FeedSLAMonitoringTest.java | 106 +++++++++++++ src/conf/startup.properties | 10 ++ .../resource/SchedulableEntityManager.java | 20 +++ 21 files changed, 655 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d18d5aa..0c03cf2 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) NEW FEATURES + FALCON-1473 Feed SLA Miss Alerts through REST API(Ajay Yadava) + FALCON-965 Open up life cycle stage implementation within Falcon for extension(Ajay Yadava) FALCON-1437 Change DR recipes notification with Falcon notification(Peeyush Bishnoi via Sowmya Ramesh) http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/ResponseHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java index a13682b..8f22af7 100644 --- a/client/src/main/java/org/apache/falcon/ResponseHelper.java +++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java @@ -26,6 +26,7 @@ import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.falcon.resource.TriageResult; import java.util.Date; @@ -287,4 +288,17 @@ public final class ResponseHelper { sb.append("\nRequest Id: ").append(dependencyResult.getRequestId()); return sb.toString(); } + + public static String getString(SchedulableEntityInstanceResult instances) { + StringBuilder sb = new StringBuilder(); + String results = instances.toString(); + if (StringUtils.isEmpty(results)) { + sb.append("No sla miss found!"); + } else { + sb.append(results); + } + sb.append("\n\nResponse: ").append(instances.getMessage()); + sb.append("\nRequest Id: ").append(instances.getRequestId()); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/cli/FalconCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java index c914649..1574585 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java @@ -37,6 +37,7 @@ import org.apache.falcon.resource.FeedLookupResult; import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; import java.io.IOException; import java.io.InputStream; @@ -86,6 +87,7 @@ public class FalconCLI { public static final String SUMMARY_OPT = "summary"; public static final String DEFINITION_OPT = "definition"; public static final String DEPENDENCY_OPT = "dependency"; + public static final String SLA_MISS_ALERT_OPT = "slaAlert"; public static final String LOOKUP_OPT = "lookup"; public static final String PATH_OPT = "path"; public static final String LIST_OPT = "list"; @@ -420,6 +422,7 @@ public class FalconCLI { String entityName = commandLine.getOptionValue(ENTITY_NAME_OPT); String filePath = commandLine.getOptionValue(FILE_PATH_OPT); String colo = commandLine.getOptionValue(COLO_OPT); + colo = getColo(colo); String cluster = commandLine.getOptionValue(CLUSTER_OPT); String start = commandLine.getOptionValue(START_OPT); String end = commandLine.getOptionValue(END_OPT); @@ -462,7 +465,15 @@ public class FalconCLI { validateSortOrder(sortOrder); String entityAction = "entity"; - if (optionsList.contains(SUBMIT_OPT)) { + if (optionsList.contains(SLA_MISS_ALERT_OPT)) { + validateNotEmpty(entityType, ENTITY_TYPE_OPT); + validateNotEmpty(start, START_OPT); + parseDateString(start); + parseDateString(end); + SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType, + entityName, start, end, colo); + result = ResponseHelper.getString(response); + } else if (optionsList.contains(SUBMIT_OPT)) { validateNotEmpty(filePath, "file"); validateColo(optionsList); result = client.submit(entityType, filePath, doAsUser).getMessage(); @@ -710,6 +721,7 @@ public class FalconCLI { Option list = new Option(LIST_OPT, false, "List entities registered for a type"); Option lookup = new Option(LOOKUP_OPT, false, "Lookup a feed given its instance's path"); + Option slaAlert = new Option(SLA_MISS_ALERT_OPT, false, "Get missing feed instances which missed SLA"); Option entitySummary = new Option(SUMMARY_OPT, false, "Get summary of instances for list of entities"); Option touch = new Option(TOUCH_OPT, false, @@ -729,6 +741,7 @@ public class FalconCLI { group.addOption(dependency); group.addOption(list); group.addOption(lookup); + group.addOption(slaAlert); group.addOption(entitySummary); group.addOption(touch); http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 981559b..20f6447 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -42,6 +42,7 @@ import org.apache.falcon.resource.InstanceDependencyResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.LineageGraphResult; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.falcon.resource.TriageResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; @@ -177,6 +178,7 @@ public class FalconClient extends AbstractFalconClient { return currentToken; } + /** * Methods allowed on Entity Resources. */ @@ -195,6 +197,7 @@ public class FalconClient extends AbstractFalconClient { SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON), LOOKUP("api/entities/lookup/", HttpMethod.GET, MediaType.APPLICATION_JSON), DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML), + SLA("api/entities/sla-alert", HttpMethod.GET, MediaType.APPLICATION_JSON), TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML); private String path; @@ -381,6 +384,24 @@ public class FalconClient extends AbstractFalconClient { //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName, + String startTime, String endTime, String colo) throws FalconCLIException { + + WebResource resource = service.path(Entities.SLA.path).path(entityType).queryParam("start", startTime) + .queryParam("colo", colo); + if (endTime != null) { + resource = resource.queryParam("end", endTime); + } + if (entityName != null) { + resource = resource.queryParam("name", entityName); + } + ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(Entities.SLA.mimeType).type(MediaType.APPLICATION_JSON) + .method(Entities.SLA.method, ClientResponse.class); + checkIfSuccessful(clientResponse); + return clientResponse.getEntity(SchedulableEntityInstanceResult.class); + } + public TriageResult triage(String entityType, String entityName, String instanceTime, String colo) throws FalconCLIException { ClientResponse clientResponse = service http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java index 62b810c..1c02f37 100644 --- a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java +++ b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java @@ -54,7 +54,7 @@ public final class SchemaHelper { try { return getDateFormat().parse(dateStr); } catch (ParseException e) { - throw new RuntimeException(e); + throw new RuntimeException("Unable to parse date: " + dateStr, e); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java index f5be63d..0968734 100644 --- a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java +++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java @@ -27,7 +27,7 @@ import java.util.Date; /** * Instance of a Schedulable Entity (Feed/Process). */ -public class SchedulableEntityInstance { +public class SchedulableEntityInstance implements Comparable<SchedulableEntityInstance> { public static final String INPUT = "Input"; public static final String OUTPUT = "Output"; @@ -152,4 +152,24 @@ public class SchedulableEntityInstance { } return result; } + + @Override + public int compareTo(SchedulableEntityInstance o) { + int result = this.cluster.compareTo(o.cluster); + if (result != 0) { + return result; + } + + result = this.entityType.compareTo(o.entityType); + if (result != 0) { + return result; + } + + result = this.entityName.compareToIgnoreCase(o.entityName); + if (result != 0) { + return result; + } + + return this.instanceTime.compareTo(o.instanceTime); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java new file mode 100644 index 0000000..752c48d --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Arrays; + +/** + * Instances list used for marshalling / unmarshalling with REST calls. + */ +@XmlRootElement(name = "instances") +@XmlAccessorType(XmlAccessType.FIELD) [email protected]({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class SchedulableEntityInstanceResult extends APIResult { + + @XmlElement(name = "instances") + private SchedulableEntityInstance[] instances; + + //For JAXB + private SchedulableEntityInstanceResult() { + super(); + } + + public SchedulableEntityInstanceResult(Status status, String message) { + super(status, message); + } + + public SchedulableEntityInstance[] getInstances() { + return instances; + } + + public void setInstances(SchedulableEntityInstance[] instances) { + this.instances = instances; + } + + + @Override + public Object[] getCollection() { + return getInstances(); + } + + @Override + public void setCollection(Object[] items) { + if (items == null) { + setInstances(new SchedulableEntityInstance[0]); + } else { + SchedulableEntityInstance[] newInstances = new SchedulableEntityInstance[items.length]; + for (int index = 0; index < items.length; index++) { + newInstances[index] = (SchedulableEntityInstance)items[index]; + } + setInstances(newInstances); + } + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + if (instances != null) { + Arrays.sort(instances); + for (SchedulableEntityInstance element : instances) { + buffer.append(element.toString()); + buffer.append("\n"); + } + } + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index 79f1959..5c252a8 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -264,7 +264,7 @@ public final class FeedHelper { return null; } - public static Sla getSLAs(Cluster cluster, Feed feed) { + public static Sla getSLA(Cluster cluster, Feed feed) { final Sla clusterSla = cluster.getSla(); if (clusterSla != null) { return clusterSla; http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java index c73cc78..6be2495 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java @@ -152,7 +152,7 @@ public class FeedEntityParser extends EntityParser<Feed> { private void validateFeedSLA(Feed feed) throws FalconException { for (Cluster cluster : feed.getClusters().getClusters()) { - Sla clusterSla = FeedHelper.getSLAs(cluster, feed); + Sla clusterSla = FeedHelper.getSLA(cluster, feed); if (clusterSla != null) { Frequency slaLowExpression = clusterSla.getSlaLow(); ExpressionHelper evaluator = ExpressionHelper.get(); http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java index 0a1810e..a4ae780 100644 --- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java +++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java @@ -69,7 +69,7 @@ public class AgeBasedDelete extends RetentionPolicy { private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException { // test that slaHigh is less than retention - Sla clusterSla = FeedHelper.getSLAs(cluster, feed); + Sla clusterSla = FeedHelper.getSLA(cluster, feed); if (clusterSla != null) { ExpressionHelper evaluator = ExpressionHelper.get(); ExpressionHelper.setReferenceDate(new Date()); http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 357b90c..3383129 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -98,6 +98,20 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.cleanup.service.frequency=minutes(5) +######### Properties for Feed SLA Monitoring ######### +# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour +*.feed.sla.serialization.frequency.millis=3600000 + +# Do not change unless really sure +# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 +*.feed.sla.statusCheck.frequency.seconds=600 + +# Do not change unless really sure +# Time Duration (in milliseconds) in future for generating pending feed instances. +# In every cycle pending feed instances are added for monitoring, till this time in future. +# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 +*.feed.sla.lookAheadWindow.millis=900000 + ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index 4f72bf8..22003d3 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -145,6 +145,54 @@ $FALCON_HOME/bin/falcon entity -type feed -lookup -path /data/projects/my-hourly If you have multiple feeds with location as /data/projects/my-hourly/${YEAR}/${MONTH}/${DAY}/${HOUR} then this command will return all of them. +---+++SLAAlert +<verbatim> +Since: 0.8 +</verbatim> + +This command lists all the feed instances which have missed sla and are still not available. If a feed instance missed +sla but is now available, then it will not be reported in results. The purpose of this API is alerting and hence it + doesn't return feed instances which missed SLA but are available as they don't require any action. + +* Currently sla monitoring is supported only for feeds. + +* Option end is optional and will default to current time if missing. + +* Option name is optional, if provided only instances of that feed will be considered. + +Usage: + +*Example 1* + +*$FALCON_HOME/bin/falcon entity -type feed -start 2014-09-05T00:00Z -slaAlert -end 2016-05-03T00:00Z -colo local* + +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T11:59Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:00Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:01Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:02Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:03Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:04Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:05Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:06Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:07Z, tags: Missed SLA High +name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:08Z, tags: Missed SLA Low + + +Response: default/Success! + +Request Id: default/216978070@qtp-830047511-4 - f5a6c129-ab42-4feb-a2bf-c3baed356248 + +*Example 2* + +*$FALCON_HOME/bin/falcon entity -type feed -start 2014-09-05T00:00Z -slaAlert -end 2016-05-03T00:00Z -colo local -name in* + +name: in, type: FEED, cluster: local, instanceTime: 2015-09-26T06:00Z, tags: Missed SLA High + +Response: default/Success! + +Request Id: default/1580107885@qtp-830047511-7 - f16cbc51-5070-4551-ad25-28f75e5e4cf2 + + ---++Instance Management Options ---+++Kill http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/restapi/FeedSLA.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/FeedSLA.twiki b/docs/src/site/twiki/restapi/FeedSLA.twiki new file mode 100644 index 0000000..5fd984e --- /dev/null +++ b/docs/src/site/twiki/restapi/FeedSLA.twiki @@ -0,0 +1,56 @@ +---++ GET /api/entities/sla-alert/:entity-type + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +<verbatim> +Since: 0.8 +</verbatim> +This command lists all the feed instances which have missed sla and are still not available. If a feed instance missed +sla but is now available, then it will not be reported in results. The purpose of this API is alerting and hence it + doesn't return feed instances which missed SLA but are available as they don't require any action. + +---++ Parameters + * :entity-type Only valid option is feed. + * entity-name <optional param> parameter to restrict results for a particular feed using feed's name. + * start <mandatory param> start of the time window for nominal instances, inclusive. + * end <mandatory param> end of the time window for nominal instances to be considered, default is treated as current time. + * colo <optional param> name of the colo + + +---++ Results +Pending feed instances which missed SLA. + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/instance/entities/feed?colo=*&start=2012-04-03T07:00Z +</verbatim> +---+++ Result +<verbatim> +{ + "status":"SUCCEEDED", + "message":"default/Success!\n", + "requestId":"default/885720178@qtp-495452957-6 - f6e82e9b-d23f-466b-82df-4fb8293ce9cf\n", + "instances":[ + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:33:00+05:30","tags":"Missed SLA High"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:29:00+05:30","tags":"Missed SLA High"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:35:00+05:30","tags":"Missed SLA Low"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:30:00+05:30","tags":"Missed SLA High"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:34:00+05:30","tags":"Missed SLA High"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:31:00+05:30","tags":"Missed SLA High"}, + {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:32:00+05:30","tags":"Missed SLA High"} + ] +} +</verbatim> + +In case there are no pending instances which have missed sla the response will be like below: +<verbatim> +{ + "status":"SUCCEEDED", + "message":"default/Success!\n", + "requestId":"default/979808239@qtp-1243851750-3 - 8c7396c0-efe2-43e9-9aea-7ae6afea5fd6\n" +} +</verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/restapi/ResourceList.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki index ea3e3b6..34c2c6f 100644 --- a/docs/src/site/twiki/restapi/ResourceList.twiki +++ b/docs/src/site/twiki/restapi/ResourceList.twiki @@ -54,6 +54,7 @@ The current version of the rest api's documentation is also hosted on the Falcon | GET | [[EntityList][api/entities/list/:entity-type]] | Get the list of entities | | GET | [[EntitySummary][api/entities/summary/:entity-type/:cluster]] | Get instance summary of all entities | | GET | [[EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity | +| GET | [[FeedSLA][api/entities/sla-alert/:entity-type]] | Get pending feed instances which missed sla | | GET | [[FeedLookup][api/entities/lookup/feed/]] | Get feed for given path | ---++ REST Call on Feed and Process Instances http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index 2682257..a6a0e4c 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -101,7 +101,7 @@ public abstract class AbstractEntityManager { return result; } - protected void checkColo(String colo) { + protected static void checkColo(String colo) { if (DeploymentUtil.isEmbeddedMode()) { return; } @@ -146,7 +146,7 @@ public abstract class AbstractEntityManager { return DeploymentUtil.getDefaultColos(); } - if (EntityType.getEnum(type) == EntityType.CLUSTER) { + if (EntityType.getEnum(type) == EntityType.CLUSTER || name == null) { return getAllColos(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index 3280789..63e0647 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -24,12 +24,15 @@ import org.apache.falcon.FalconWebException; import org.apache.falcon.Pair; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.lock.MemoryLocks; +import org.apache.falcon.entity.parser.ValidationException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.UnschedulableEntityException; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.monitors.Dimension; +import org.apache.falcon.service.FeedSLAMonitoringService; +import org.apache.falcon.util.DeploymentUtil; import org.apache.hadoop.security.authorize.AuthorizationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,6 +106,73 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } } + public static void validateSlaParams(String entityType, String entityName, String start, String end, + String colo) throws FalconException { + EntityType type = EntityType.getEnum(entityType); + if (type != EntityType.FEED) { + throw new ValidationException("SLA monitoring is not supported for: " + type); + } + + // validate valid feed name. + if (StringUtils.isNotBlank(entityName)) { + EntityUtil.getEntity(EntityType.FEED, entityName); + } + + Date startTime, endTime; + // validate mandatory start date + if (StringUtils.isBlank(start)) { + throw new ValidationException("'start' is mandatory and can not be blank."); + } else { + startTime = SchemaHelper.parseDateUTC(start); + } + + // validate optional end date + if (StringUtils.isBlank(end)) { + endTime = new Date(); + } else { + endTime = SchemaHelper.parseDateUTC(end); + } + + if (startTime.after(endTime)) { + throw new ValidationException("start can not be after end"); + } + + checkColo(colo); + } + + /** + * Returns the feed instances which are not yet available and have missed either slaLow or slaHigh. + * This api doesn't return the feeds which missed SLA but are now available. Purpose of this api is to show feed + * instances which you need to attend to. + * @param startStr startTime in + * @param endStr + */ + public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(String feedName, String startStr, String endStr, + String colo) { + + Set<SchedulableEntityInstance> instances = new HashSet<>(); + try { + checkColo(colo); + Date start = EntityUtil.parseDateUTC(startStr); + Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr); + + if (StringUtils.isBlank(feedName)) { + instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end)); + } else { + for (String clusterName : DeploymentUtil.getCurrentClusters()) { + instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName, + clusterName, start, end)); + } + } + } catch (FalconException e) { + throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST); + } + SchedulableEntityInstanceResult result = new SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED, + "Success!"); + result.setCollection(instances.toArray()); + return result; + } + /** * Submits a new entity and schedules it immediately. * http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 47038e5..9d13d74 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -34,19 +34,20 @@ import org.apache.falcon.resource.AbstractSchedulableEntityManager; import org.apache.falcon.resource.EntityList; import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedLookupResult; +import org.apache.falcon.resource.SchedulableEntityInstanceResult; import org.apache.falcon.resource.channel.Channel; import org.apache.falcon.resource.channel.ChannelFactory; - import org.apache.falcon.util.DeploymentUtil; + import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; -import javax.ws.rs.POST; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; @@ -110,6 +111,36 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana return new BufferedRequest(request); } + @GET + @Path("sla-alert/{type}") + @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML}) + @Monitored(event = "feed-sla-misses") + public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts( + @Dimension("entityType") @PathParam("type") final String entityType, + @Dimension("entityName") @QueryParam("name") final String entityName, + @Dimension("start") @QueryParam("start") final String start, + @Dimension("end") @QueryParam("end") final String end, + @Dimension("colo") @QueryParam("colo") final String colo) { + try { + validateSlaParams(entityType, entityName, start, end, colo); + } catch (Exception e) { + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } + return new EntityProxy<SchedulableEntityInstanceResult>(entityType, entityName, + SchedulableEntityInstanceResult.class) { + @Override + protected Set<String> getColosToApply() { + return getApplicableColos(entityType, entityName); + } + + @Override + protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException { + return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName, + start, end, colo); + } + }.execute(); + } + /** * Submit the given entity. * @param request Servlet Request http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 8bf43b8..193aa64 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -25,15 +25,20 @@ import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.FeedInstanceStatus; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,19 +52,30 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Service to monitor Feed SLAs. */ -public class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { +public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000); private static final int ONE_MS = 1; + private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService(); + + private FeedSLAMonitoringService() { + + } + + public static FeedSLAMonitoringService get() { + return SERVICE; + } + /** * Permissions for storePath. */ @@ -68,31 +84,31 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa /** * Feeds to be monitored. */ - private static Set<String> monitoredFeeds; + private Set<String> monitoredFeeds; /** * Map<Pair<feedName, clusterName>, Set<instanceTime> to store * each missing instance of a feed. */ - private static Map<Pair<String, String>, Set<Date>> pendingInstances; + private Map<Pair<String, String>, Set<Date>> pendingInstances; /** * Used to store the last time when pending instances were checked for SLA. */ - private static Date lastCheckedAt; + private Date lastCheckedAt; /** * Used to store last time when the state was serialized to the store. */ - private static Date lastSerializedAt; + private Date lastSerializedAt; /** * Frequency in seconds of "status check" for pending feed instances. */ - private static final int STATUS_CHECK_FREQUENCY_SECS = 10 * 60; // 10 minutes + private int statusCheckFrequencySeconds; // 10 minutes /** @@ -100,7 +116,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa * * In every cycle pending feed instances are added for monitoring, till this time in future. */ - private static final int LOOKAHEAD_WINDOW_MILLIS = 15 * 60 * 1000; // 15 MINUTES + private int lookAheadWindowMillis; // 15 MINUTES /** @@ -128,12 +144,14 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa if (entity.getEntityType() == EntityType.FEED) { Feed feed = (Feed) entity; // currently sla service is enabled only for fileSystemStorage - if (feed.getLocations() != null && feed.getSla() != null) { + if (feed.getLocations() != null) { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName())) { - LOG.debug("Adding feed:{} for monitoring", feed.getName()); - monitoredFeeds.add(feed.getName()); + if (FeedHelper.getSLA(cluster, feed) != null) { + LOG.debug("Adding feed:{} for monitoring", feed.getName()); + monitoredFeeds.add(feed.getName()); + } } } } @@ -180,13 +198,14 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa fileSystem = initializeFileSystem(); String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR); - try { - serializationFrequencyMillis = Integer.valueOf(freq); - } catch (NumberFormatException e) { - LOG.error("Invalid value : {} found in startup.properties for the property " - + "feed.sla.serialization.frequency.millis Should be an integer", freq); - throw new FalconException("Invalid integer value for property ", e); - } + serializationFrequencyMillis = Integer.valueOf(freq); + + freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + statusCheckFrequencySeconds = Integer.valueOf(freq); + + freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); + lookAheadWindowMillis = Integer.valueOf(freq); + try { if (fileSystem.exists(filePath)) { deserialize(filePath); @@ -198,7 +217,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa throw new FalconException("Couldn't check the existence of " + filePath, e); } ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - executor.scheduleWithFixedDelay(new Monitor(), 0, STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS); + executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } @Override @@ -231,7 +250,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date now = new Date(); - Date newCheckPoint = new Date(now.getTime() + LOOKAHEAD_WINDOW_MILLIS); + Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); addNewPendingFeedInstances(lastCheckedAt, newCheckPoint); lastCheckedAt = newCheckPoint; @@ -339,10 +358,10 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa private void deserialize(Path path) throws FalconException { try { Map<String, Object> state = deserializeInternal(path); - pendingInstances = (Map<Pair<String, String>, Set<Date>>) state.get("pendingInstances"); + pendingInstances = (ConcurrentHashMap<Pair<String, String>, Set<Date>>) state.get("pendingInstances"); lastCheckedAt = new Date((Long) state.get("lastCheckedAt")); lastSerializedAt = new Date((Long) state.get("lastSerializedAt")); - monitoredFeeds = new HashSet<>(); // will be populated on the onLoad of entities. + monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities. LOG.debug("Restored the service from old state."); } catch (IOException | ClassNotFoundException e) { throw new FalconException("Couldn't deserialize the old state", e); @@ -350,10 +369,10 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa } private void initializeService() { - pendingInstances = new HashMap<>(); + pendingInstances = new ConcurrentHashMap<>(); lastCheckedAt = new Date(); lastSerializedAt = new Date(); - monitoredFeeds = new HashSet<>(); + monitoredFeeds = new ConcurrentHashSet<>(); } @SuppressWarnings("unchecked") @@ -364,10 +383,98 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa try { state = (Map<String, Object>) ois.readObject(); } finally { - ois.close(); + IOUtils.closeQuietly(ois); } return state; } -} + /** + * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed + * slaLow or slaHigh. + * + * Only feeds which have defined sla in their definition are considered. + * Only the feed instances between the given time range are considered. + * Start time and end time are both inclusive. + * @param start start time, inclusive + * @param end end time, inclusive + * @return + * @throws FalconException + */ + public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end) + throws FalconException { + Set<SchedulableEntityInstance> result = new HashSet<>(); + for (Map.Entry<Pair<String, String>, Set<Date>> feedInstances : pendingInstances.entrySet()) { + Pair<String, String> feedClusterPair = feedInstances.getKey(); + Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); + Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); + Sla sla = FeedHelper.getSLA(cluster, feed); + if (sla != null) { + Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue()); + for (Pair<Date, String> status : slaStatus){ + SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, + feedClusterPair.second, status.first, EntityType.FEED); + instance.setTags(status.second); + result.add(instance); + } + } + } + return result; + } + + /** + * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range + * which missed sla.Only those instances are included which have missed either slaLow or slaHigh. + * @param feedName name of the feed + * @param clusterName cluster name + * @param start start time, inclusive + * @param end end time, inclusive + * @return + * @throws FalconException + */ + public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName, + Date start, Date end) throws FalconException { + + Set<SchedulableEntityInstance> result = new HashSet<>(); + Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName); + Set<Date> missingInstances = pendingInstances.get(feedClusterPair); + Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); + Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); + Sla sla = FeedHelper.getSLA(cluster, feed); + if (missingInstances != null && sla != null) { + Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, missingInstances); + for (Pair<Date, String> status : slaStatus){ + SchedulableEntityInstance instance = new SchedulableEntityInstance(feedName, clusterName, status.first, + EntityType.FEED); + instance.setTags(status.second); + result.add(instance); + } + } + return result; + } + Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, Set<Date> missingInstances) + throws FalconException { + String tagCritical = "Missed SLA High"; + String tagWarn = "Missed SLA Low"; + Date now = new Date(); + Frequency slaLow = sla.getSlaLow(); + Frequency slaHigh = sla.getSlaHigh(); + Set<Pair<Date, String>> result = new HashSet<>(); + for (Date nominalTime : missingInstances) { + if (!nominalTime.before(start) && !nominalTime.after(end)) { + ExpressionHelper.setReferenceDate(nominalTime); + ExpressionHelper evaluator = ExpressionHelper.get(); + Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class); + Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class); + Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration); + Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration); + if (slaCriticalTime.before(now)) { + result.add(new Pair<>(nominalTime, tagCritical)); + } else if (slaWarnTime.before(now)) { + result.add(new Pair<>(nominalTime, tagWarn)); + } + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java new file mode 100644 index 0000000..bc03cb5 --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; +import org.apache.falcon.entity.EntityNotRegisteredException; +import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.resource.AbstractSchedulableEntityManager; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +/** + * Tests for FeedSLAMonitoring Service. + */ +public class FeedSLAMonitoringTest { + + @Test + public void testSLAStatus() throws FalconException { + // sla, start, end, missingInstances + Sla sla = new Sla(); + sla.setSlaLow(new Frequency("days(1)")); + sla.setSlaHigh(new Frequency("days(2)")); + + Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z"); + Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z"); + + Set<Date> missingInstances = new HashSet<>(); + missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z")); + missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time + missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time + + Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances); + Set<Pair<Date, String>> expected = new HashSet<>(); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), "Missed SLA High")); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), "Missed SLA High")); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), "Missed SLA High")); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), "Missed SLA High")); + Assert.assertEquals(result, expected); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = "SLA monitoring is not supported for: PROCESS") + public void testInvalidType() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("process", + "in", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = EntityNotRegisteredException.class, + expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*") + public void testInvalidName() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", + "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string") + public void testInvalidStart() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = "start can not be after end") + public void testInvalidRange() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", + null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*"); + } + + @Test + public void testOptionalName() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test + public void testOptionalEnd() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*"); + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 9c6aef7..8e4ce97 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -96,9 +96,19 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.cleanup.service.frequency=days(1) +######### Properties for Feed SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 +# Do not change unless really sure +# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 +*.feed.sla.statusCheck.frequency.seconds=600 + +# Do not change unless really sure +# Time Duration (in milliseconds) in future for generating pending feed instances. +# In every cycle pending feed instances are added for monitoring, till this time in future. +# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 +*.feed.sla.lookAheadWindow.millis=900000 ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index 1c0fc74..4a20e41 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -19,6 +19,7 @@ package org.apache.falcon.resource; import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconWebException; import org.apache.falcon.monitors.Dimension; import org.apache.falcon.monitors.Monitored; @@ -33,6 +34,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; /** * Entity management operations as REST API for feed and process. @@ -52,6 +54,24 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { } @GET + @Path("sla-alert/{type}") + @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML}) + @Monitored(event = "feed-sla-misses") + public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts( + @Dimension("entityType") @PathParam("type") String entityType, + @Dimension("entityName") @QueryParam("name") String entityName, + @Dimension("start") @QueryParam("start") String start, + @Dimension("end") @QueryParam("end") String end, + @Dimension("colo") @QueryParam("colo") final String colo) { + try { + validateSlaParams(entityType, entityName, start, end, colo); + } catch (Exception e) { + throw FalconWebException.newException(e, Response.Status.BAD_REQUEST); + } + return super.getFeedSLAMissPendingAlerts(entityName, start, end, colo); + } + + @GET @Path("dependencies/{type}/{entity}") @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON}) @Monitored(event = "dependencies")
