Repository: falcon Updated Branches: refs/heads/master c3697de62 -> 91c0a9926
FALCON-2077 Api support for Process SLA Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #278 from PraveenAdlakha/2077_v1 and squashes the following commits: 3b89e83 [Praveen Adlakha] changed method in EntityType a4c3686 [Praveen Adlakha] comments addressed 860d055 [Praveen Adlakha] FALCON-2077 Api support for Process SLA Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/91c0a992 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/91c0a992 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/91c0a992 Branch: refs/heads/master Commit: 91c0a9926f157e2883ad976004230d863258ae62 Parents: c3697de Author: Praveen Adlakha <[email protected]> Authored: Tue Sep 6 14:53:44 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Sep 6 14:53:44 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/entity/v0/EntityType.java | 10 ++ .../org/apache/falcon/entity/EntityUtil.java | 1 + .../falcon/hadoop/HadoopClientFactory.java | 4 +- .../falcon/persistence/EntitySLAAlertBean.java | 15 +- .../falcon/persistence/MonitoredEntityBean.java | 15 +- .../falcon/persistence/PendingInstanceBean.java | 15 +- common/src/main/resources/startup.properties | 10 +- .../workflow/engine/OozieClientFactory.java | 2 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 22 +-- .../AbstractSchedulableEntityManager.java | 33 ++-- .../proxy/SchedulableEntityManagerProxy.java | 4 +- .../falcon/service/EntitySLAAlertService.java | 17 +- .../service/EntitySLAMonitoringService.java | 94 +++++----- .../service/EntitySLAAlertServiceTest.java | 4 +- .../falcon/service/EntitySLAMonitoringTest.java | 174 +++++++++++++++++++ .../falcon/service/FeedSLAMonitoringTest.java | 164 ----------------- prism/src/test/resources/startup.properties | 8 +- src/conf/startup.properties | 10 +- .../resource/SchedulableEntityManager.java | 6 +- 19 files changed, 314 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java index 3d55547..29dbc7a 100644 --- a/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java +++ b/client/src/main/java/org/apache/falcon/entity/v0/EntityType.java @@ -101,6 +101,16 @@ public enum EntityType { return ((this != EntityType.CLUSTER) && (this != EntityType.DATASOURCE)); } + public static void assertSchedulable(String entityType){ + EntityType type = EntityType.getEnum(entityType); + if (type.isSchedulable()){ + return; + } else { + throw new IllegalArgumentException("EntityType "+ entityType + + " is not valid,Feed and Process are the valid input type."); + } + } + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP"}) public String[] getImmutableProperties() { return immutableProperties; http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index aef1fd5..8fe316c 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -1204,4 +1204,5 @@ public final class EntityUtil { return false; } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java index e30f51e..f32df6d 100644 --- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java +++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java @@ -202,12 +202,12 @@ public final class HadoopClientFactory { // prevent falcon impersonating falcon, no need to use doas final String proxyUserName = ugi.getShortUserName(); if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) { - LOG.info("Creating FS for the login user {}, impersonation not required", + LOG.trace("Creating FS for the login user {}, impersonation not required", proxyUserName); return FileSystem.get(uri, conf); } - LOG.info("Creating FS impersonating user {}", proxyUserName); + LOG.trace("Creating FS impersonating user {}", proxyUserName); return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws Exception { return FileSystem.get(uri, conf); http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java index e2096fe..1419f48 100644 --- a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java @@ -18,7 +18,6 @@ package org.apache.falcon.persistence; -import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; import java.util.Date; @@ -69,9 +68,9 @@ public class EntitySLAAlertBean { return entityType; } - public void setEntityType(String entityType) throws FalconException { - checkEntityType(entityType); - this.entityType = entityType; + public void setEntityType(String entityType) { + EntityType.assertSchedulable(entityType); + this.entityType = entityType.toLowerCase(); } @Basic @@ -157,12 +156,4 @@ public class EntitySLAAlertBean { public static final String NOMINALTIME = "nominalTime"; - void checkEntityType(String entityType)throws FalconException{ - if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ - return; - } else { - throw new FalconException("EntityType"+ entityType - + " is not valid,Feed and Process are the valid input type."); - } - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java index 1db3d04..c620e45 100644 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.persistence; -import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; import javax.persistence.Entity; @@ -64,9 +63,9 @@ public class MonitoredEntityBean { return entityType; } - public void setEntityType(String entityType) throws FalconException { - checkEntityType(entityType); - this.entityType = entityType; + public void setEntityType(String entityType) { + EntityType.assertSchedulable(entityType); + this.entityType = entityType.toLowerCase(); } @Basic @@ -94,12 +93,4 @@ public class MonitoredEntityBean { public static final String ENTITYTYPE = "entityType"; - void checkEntityType(String entityType)throws FalconException { - if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ - return; - } else { - throw new FalconException("EntityType"+ entityType - + " is not valid,Feed and Process are the valid input type."); - } - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 863abdc..43b6b8e 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -17,7 +17,6 @@ */ package org.apache.falcon.persistence; -import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; import javax.persistence.Entity; @@ -73,9 +72,9 @@ public class PendingInstanceBean { return entityType; } - public void setEntityType(String entityType) throws FalconException { - checkEntityType(entityType); - this.entityType = entityType; + public void setEntityType(String entityType) { + EntityType.assertSchedulable(entityType); + this.entityType = entityType.toLowerCase(); } @Basic @@ -123,12 +122,4 @@ public class PendingInstanceBean { public static final String ENTITYTYPE = "entityType"; - void checkEntityType(String entityType)throws FalconException { - if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ - return; - } else { - throw new FalconException("EntityType"+ entityType - + " is not valid,Feed and Process are the valid input type."); - } - } } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 8d64c54..3beab62 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -73,7 +73,7 @@ # org.apache.falcon.state.store.jdbc.JdbcStateStore ## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here -*.feedAlert.listeners= +*.entityAlert.listeners= ##### JMS MQ Broker Implementation class ##### *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory @@ -98,7 +98,7 @@ #Configurations used in UTs debug.config.store.uri=file://${user.dir}/target/store #Location to store state of Feed SLA monitoring service -debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances +debug.entity.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingentityinstances debug.config.oozie.conf.uri=${user.dir}/target/oozie debug.system.lib.location=${system.lib.location} debug.broker.url=vm://localhost @@ -122,7 +122,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.cleanup.service.frequency=minutes(5) -######### Properties for Feed SLA Monitoring ######### +######### Properties for Entity SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 @@ -132,13 +132,13 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Do not change unless really sure # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 -*.feed.sla.statusCheck.frequency.seconds=600 +*.entity.sla.statusCheck.frequency.seconds=600 # Do not change unless really sure # Time Duration (in milliseconds) in future for generating pending feed instances. # In every cycle pending feed instances are added for monitoring, till this time in future. # It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 -*.feed.sla.lookAheadWindow.millis=900000 +*.entity.sla.lookAheadWindow.millis=900000 ######### Properties for configuring JMS provider - activemq ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java index ae5c5fa..3380b1a 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java @@ -47,7 +47,7 @@ public final class OozieClientFactory { assert cluster != null : "Cluster cant be null"; String oozieUrl = ClusterHelper.getOozieUrl(cluster); - LOG.info("Creating Oozie client object for {}", oozieUrl); + LOG.trace("Creating Oozie client object for {}", oozieUrl); return getClientRef(oozieUrl); } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 6a38b0a..c479940 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -62,7 +62,7 @@ public class MonitoringJdbcStateStore { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); - q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType); + q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase()); List result = q.getResultList(); try { if (result.isEmpty()) { @@ -79,7 +79,7 @@ public class MonitoringJdbcStateStore { beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); - q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType); + q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -98,7 +98,7 @@ public class MonitoringJdbcStateStore { public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType) throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); List result = q.getResultList(); entityManager.close(); return result; @@ -108,7 +108,7 @@ public class MonitoringJdbcStateStore { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class); q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); Date result = (Date)q.getSingleResult(); entityManager.close(); return result; @@ -121,7 +121,7 @@ public class MonitoringJdbcStateStore { q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -135,7 +135,7 @@ public class MonitoringJdbcStateStore { Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY); q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -162,7 +162,7 @@ public class MonitoringJdbcStateStore { Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); List result = q.getResultList(); entityManager.close(); return result; @@ -198,7 +198,7 @@ public class MonitoringJdbcStateStore { q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); try { return q.getSingleResult(); } finally { @@ -215,7 +215,7 @@ public class MonitoringJdbcStateStore { q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); try { return q.getSingleResult(); } finally { @@ -248,7 +248,7 @@ public class MonitoringJdbcStateStore { q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -263,7 +263,7 @@ public class MonitoringJdbcStateStore { q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index 895f8b2..3bdeb99 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -111,16 +111,27 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } } + /** + * Validates the parameters whether SLA is supported or not. + * + * @param entityType currently two entityTypes are supported Process and Feed + * @param entityName name of the entity + * @param start startDate from which SLA is to be looked at + * @param end endDate upto which SLA is to be looked at. + * @param colo colo in which entity is to be looked into + * @throws FalconException if the validation fails + * **/ + public static void validateSlaParams(String entityType, String entityName, String start, String end, String colo) throws FalconException { EntityType type = EntityType.getEnum(entityType); - if (type != EntityType.FEED) { + if (!type.isSchedulable()){ throw new ValidationException("SLA monitoring is not supported for: " + type); } - // validate valid feed name. + // validate valid entity name. if (StringUtils.isNotBlank(entityName)) { - EntityUtil.getEntity(EntityType.FEED, entityName); + EntityUtil.getEntity(entityType, entityName); } Date startTime, endTime; @@ -146,14 +157,14 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM } /** - * Returns the feed instances which are not yet available and have missed either slaLow or slaHigh. - * This api doesn't return the feeds which missed SLA but are now available. Purpose of this api is to show feed - * instances which you need to attend to. + * Returns the entity instances which are not yet available and have missed either slaLow or slaHigh. + * This api doesn't return the entitites which missed SLA but are now available. Purpose of this api is to + * show entity instances which you need to attend to. * @param startStr startTime in * @param endStr */ - public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(String feedName, String startStr, String endStr, - String colo) { + public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts(String entityName, String entityType, + String startStr, String endStr, String colo) { Set<SchedulableEntityInstance> instances = new HashSet<>(); try { @@ -161,12 +172,12 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM Date start = EntityUtil.parseDateUTC(startStr); Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr); - if (StringUtils.isBlank(feedName)) { + if (StringUtils.isBlank(entityName)) { instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end)); } else { for (String clusterName : DeploymentUtil.getCurrentClusters()) { - instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName, - clusterName, start, end, EntityType.FEED.toString())); + instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName, + clusterName, start, end, entityType)); } } } catch (FalconException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 249c273..07334d6 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -113,8 +113,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @GET @Path("sla-alert/{type}") @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML}) - @Monitored(event = "feed-sla-misses") - public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts( + @Monitored(event = "entity-sla-misses") + public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts( @Dimension("entityType") @PathParam("type") final String entityType, @Dimension("entityName") @QueryParam("name") final String entityName, @Dimension("start") @QueryParam("start") final String start, http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index a7cafeb..bcf11e3 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -39,7 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Service to know which all feeds have missed SLA. + * Service to know which all entities have missed SLA. */ public final class EntitySLAAlertService implements FalconService, EntitySLAListener { @@ -68,7 +68,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList @Override public void init() throws FalconException { String listenerClassNames = StartupProperties.get(). - getProperty("feedAlert.listeners"); + getProperty("entityAlert.listeners"); if (listenerClassNames != null && !listenerClassNames.isEmpty()) { for (String listenerClassName : listenerClassNames.split(",")) { listenerClassName = listenerClassName.trim(); @@ -80,7 +80,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList } } - String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600"); int statusCheckFrequencySeconds = Integer.parseInt(freq); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); @@ -106,7 +106,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList } void processSLACandidates(){ - //Get all feeds instances to be monitored + //Get all entity instances to be monitored List<PendingInstanceBean> pendingInstanceBeanList = store.getAllPendingInstances(); if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ return; @@ -139,17 +139,16 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList store.putSLAAlertInstance(entityName, clusterName, entityType, nominalTime, true, false); //Mark in DB as SLA missed - LOG.info("Feed :"+ entityName - + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow"); + LOG.info("Entity : {} Cluster : {} Nominal Time : {} missed SLALow", entityName, entityType, + clusterName, nominalTime); } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){ if (entityType.equals(EntityType.PROCESS.name())){ store.putSLAAlertInstance(entityName, clusterName, entityType, nominalTime, true, false); } store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType); - LOG.info("Entity :"+ entityName - + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType - + "missed SLAHigh"); + LOG.info("Entity :{} EntityType : {} Cluster: {} Nominal Time: {} missed SLAHigh", entityName, + entityType , clusterName , nominalTime); highSLAMissed(entityName, clusterName, EntityType.valueOf(entityType), nominalTime); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 185e087..1e20a2b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -20,7 +20,6 @@ package org.apache.falcon.service; import java.text.ParseException; import java.util.HashSet; import java.util.ArrayList; -import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Set; @@ -72,7 +71,7 @@ import com.google.common.annotations.VisibleForTesting; * Service to monitor Feed SLAs. */ public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService { - private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); + private static final Logger LOG = LoggerFactory.getLogger(EntitySLAMonitoringService.class); private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); @@ -98,15 +97,15 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList /** - * Frequency in seconds of "status check" for pending feed instances. + * Frequency in seconds of "status check" for pending entity instances. */ private int statusCheckFrequencySeconds; // 10 minutes /** - * Time Duration (in milliseconds) in future for generating pending feed instances. + * Time Duration (in milliseconds) in future for generating pending entity instances. * - * In every cycle pending feed instances are added for monitoring, till this time in future. + * In every cycle pending entity instances are added for monitoring, till this time in future. */ private int lookAheadWindowMillis; // 15 MINUTES @@ -117,7 +116,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList private FileSystem fileSystem; /** - * Working directory for the feed sla monitoring service. + * Working directory for the entity sla monitoring service. */ private Path storePath; @@ -294,15 +293,15 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @Override public void init() throws FalconException { - String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri"); + String uri = StartupProperties.get().getProperty("entity.sla.service.store.uri"); storePath = new Path(uri); - filePath = new Path(storePath, "feedSLAMonitoringService"); + filePath = new Path(storePath, "entitySLAMonitoringService"); fileSystem = initializeFileSystem(); - String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + String freq = StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", "600"); statusCheckFrequencySeconds = Integer.parseInt(freq); - freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); + freq = StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", "900000"); lookAheadWindowMillis = Integer.parseInt(freq); LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); @@ -326,13 +325,13 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList try { fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); if (!fileSystem.exists(storePath)) { - LOG.info("Creating directory for pending feed instances: {}", storePath); + LOG.info("Creating directory for pending entity instances: {}", storePath); // set permissions so config store dir is owned by falcon alone HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION); } return fileSystem; } catch (Exception e) { - throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e); + throw new RuntimeException("Unable to bring up entity sla store for path: " + storePath, e); } } @@ -353,8 +352,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date now = new Date(); Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); - addNewPendingFeedInstances(newCheckPoint, EntityType.FEED.toString()); - addNewPendingFeedInstances(newCheckPoint, EntityType.PROCESS.toString()); + addNewPendingEntityInstances(newCheckPoint, EntityType.FEED.toString()); + addNewPendingEntityInstances(newCheckPoint, EntityType.PROCESS.toString()); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -363,7 +362,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } - void addNewPendingFeedInstances(Date to, String entityType) throws FalconException { + void addNewPendingEntityInstances(Date to, String entityType) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE. getAllMonitoredEntityForEntity(entityType); @@ -439,7 +438,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } } - // checks whether a given feed instance is available or not + // checks whether a given entity instance is available or not private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime, String entityType) throws FalconException { Entity entity = EntityUtil.getEntity(entityType, entityName); @@ -451,8 +450,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null); if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){ - LOG.debug("Entity instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(), - clusterName, nominalTime); + LOG.debug("Entity instance(Process:{}, cluster:{}, instanceTime:{}) is available.", + entity.getName(), clusterName, nominalTime); return true; } return false; @@ -481,35 +480,54 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList /** - * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed - * slaLow or slaHigh. + * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} and {@link org.apache.falcon.entity.v0.process.Process} + * instances between given time range which have missed slaLow or slaHigh. * - * Only feeds which have defined sla in their definition are considered. - * Only the feed instances between the given time range are considered. + * Only entities which have defined sla in their definition are considered. + * Only the entity instances between the given time range are considered. * Start time and end time are both inclusive. * @param start start time, inclusive * @param end end time, inclusive - * @return Set of pending feed instances belonging to the given range which have missed SLA + * @return Set of pending entity instances belonging to the given range which have missed SLA * @throws FalconException */ public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end) throws FalconException { Set<SchedulableEntityInstance> result = new HashSet<>(); for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ - Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(), + Pair<String, String> entityClusterPair = new Pair<>(pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName()); - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); - Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); - Sla sla = FeedHelper.getSLA(cluster, feed); - if (sla != null) { - Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, - MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), EntityType.FEED.toString())); - for (Pair<Date, String> status : slaStatus) { - SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, - feedClusterPair.second, status.first, EntityType.FEED); - instance.setTags(status.second); - result.add(instance); + String entityType = pendingInstanceBean.getEntityType(); + if (entityType.equalsIgnoreCase(EntityType.FEED.toString())){ + Feed feed = EntityUtil.getEntity(entityType, entityClusterPair.first); + Cluster cluster = FeedHelper.getCluster(feed, entityClusterPair.second); + Sla sla = FeedHelper.getSLA(cluster, feed); + if (sla != null) { + Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, + MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first, + entityClusterPair.second, entityType)); + for (Pair<Date, String> status : slaStatus) { + SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first, + entityClusterPair.second, status.first, EntityType.FEED); + instance.setTags(status.second); + result.add(instance); + } + } + } else { + Process process = EntityUtil.getEntity(entityType, entityClusterPair.first); + org.apache.falcon.entity.v0.process.Cluster cluster = ProcessHelper.getCluster(process, + entityClusterPair.second); + org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process); + if (sla != null){ + Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end, + MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first, + entityClusterPair.second, entityType)); + for (Pair<Date, String> status : slaStatus) { + SchedulableEntityInstance instance = new SchedulableEntityInstance(entityClusterPair.first, + entityClusterPair.second, status.first, EntityType.PROCESS); + instance.setTags(status.second); + result.add(instance); + } } } } @@ -528,12 +546,10 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList */ public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName, Date start, Date end, String entityType) throws FalconException { - Set<SchedulableEntityInstance> result = new HashSet<>(); List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName, entityType); - if (missingInstances == null || !Arrays.asList(EntityType.FEED.toString(), - EntityType.PROCESS.toString()).contains(entityType)){ + if (missingInstances == null){ return result; } Entity entity = EntityUtil.getEntity(entityType, entityName); http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java index c8b4f5e..8b51354 100644 --- a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java +++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java @@ -105,7 +105,7 @@ public class EntitySLAAlertServiceTest extends AbstractTestBase { } } - @Test + @Test(expectedExceptions = javax.persistence.NoResultException.class) public static void processSLALowCandidates() throws FalconException, InterruptedException{ Date dateOne = new Date(System.currentTimeMillis()-100000); @@ -141,7 +141,7 @@ public class EntitySLAAlertServiceTest extends AbstractTestBase { dateOne, EntityType.FEED.toString()).getIsSLALowMissed()); } - @Test + @Test(expectedExceptions = javax.persistence.NoResultException.class) public static void processSLACandidateProcess() throws FalconException, InterruptedException{ Date dateOne = new Date(System.currentTimeMillis()-130000); http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java new file mode 100644 index 0000000..2bc4cbf --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAMonitoringTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.service; + +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimeZone; + +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.EntityNotRegisteredException; +import org.apache.falcon.entity.parser.ValidationException; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Clusters; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.feed.Validity; +import org.apache.falcon.resource.AbstractSchedulableEntityManager; + +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Tests for EntitySLAMonitoring Service. + */ +public class EntitySLAMonitoringTest extends AbstractTestBase { + private static final String CLUSTER_NAME = "testCluster"; + private static final String FEED_NAME = "testFeed"; + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL; + + @Test + public void testSLAStatus() throws FalconException { + // sla, start, end, missingInstances + Sla sla = new Sla(); + sla.setSlaLow(new Frequency("days(1)")); + sla.setSlaHigh(new Frequency("days(2)")); + + Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z"); + Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z"); + + List<Date> missingInstances = new ArrayList<>(); + missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between + missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z")); + missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time + missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time + + Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end, + missingInstances); + Set<Pair<Date, String>> expected = new HashSet<>(); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), TAG_CRITICAL)); + Assert.assertEquals(result, expected); + } + + @Test(expectedExceptions = EntityNotRegisteredException.class, + expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*") + public void testInvalidFeedName() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", + "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = EntityNotRegisteredException.class, + expectedExceptionsMessageRegExp = ".*\\(PROCESS\\) not found.*") + public void testInvalidProcessName() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("process", + "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string") + public void testInvalidStart() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", null, + "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*"); + } + + @Test(expectedExceptions = ValidationException.class, + expectedExceptionsMessageRegExp = "start can not be after end") + public void testInvalidRange() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", + null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", + null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*"); + } + + @Test + public void testOptionalName() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", + "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", + "*"); + } + + @Test + public void testOptionalEnd() throws FalconException { + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*"); + AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", "", "*"); + AbstractSchedulableEntityManager.validateSlaParams("process", null, "2015-05-05T00:00Z", null, "*"); + } + + private Cluster publishCluster() throws FalconException { + Cluster cluster = new Cluster(); + cluster.setName(CLUSTER_NAME); + cluster.setColo("default"); + getStore().publish(EntityType.CLUSTER, cluster); + return cluster; + + } + + private Feed publishFeed(Cluster cluster, String frequency, String start, String end) + throws FalconException, ParseException { + Feed feed = new Feed(); + feed.setName(FEED_NAME); + Frequency f = new Frequency(frequency); + feed.setFrequency(f); + feed.setTimezone(UTC); + Clusters fClusters = new Clusters(); + org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); + fCluster.setType(ClusterType.SOURCE); + fCluster.setName(cluster.getName()); + fCluster.setValidity(getFeedValidity(start, end)); + fClusters.getClusters().add(fCluster); + feed.setClusters(fClusters); + getStore().publish(EntityType.FEED, feed); + return feed; + } + + private Validity getFeedValidity(String start, String end) throws ParseException { + Validity validity = new Validity(); + validity.setStart(getDate(start)); + validity.setEnd(getDate(end)); + return validity; + } + + private Date getDate(String dateString) throws ParseException { + DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); + return format.parse(dateString); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java deleted file mode 100644 index 9cf50c2..0000000 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TimeZone; - -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.parser.ValidationException; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.entity.v0.feed.Validity; -import org.apache.falcon.resource.AbstractSchedulableEntityManager; - -import org.testng.Assert; -import org.testng.annotations.Test; - -/** - * Tests for FeedSLAMonitoring Service. - */ -public class FeedSLAMonitoringTest extends AbstractTestBase { - private static final String CLUSTER_NAME = "testCluster"; - private static final String FEED_NAME = "testFeed"; - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL; - - @Test - public void testSLAStatus() throws FalconException { - // sla, start, end, missingInstances - Sla sla = new Sla(); - sla.setSlaLow(new Frequency("days(1)")); - sla.setSlaHigh(new Frequency("days(2)")); - - Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z"); - Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z"); - - List<Date> missingInstances = new ArrayList<>(); - missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time - missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time - missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between - missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z")); - missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time - missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time - - Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end, - missingInstances); - Set<Pair<Date, String>> expected = new HashSet<>(); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL)); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL)); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), TAG_CRITICAL)); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), TAG_CRITICAL)); - Assert.assertEquals(result, expected); - } - - @Test(expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = "SLA monitoring is not supported for: PROCESS") - public void testInvalidType() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("process", - "in", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); - } - - @Test(expectedExceptions = EntityNotRegisteredException.class, - expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*") - public void testInvalidName() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("feed", - "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string") - public void testInvalidStart() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*"); - } - - @Test(expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = "start can not be after end") - public void testInvalidRange() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("feed", - null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*"); - } - - @Test - public void testOptionalName() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); - AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*"); - } - - @Test - public void testOptionalEnd() throws FalconException { - AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*"); - AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*"); - } - - private Cluster publishCluster() throws FalconException { - Cluster cluster = new Cluster(); - cluster.setName(CLUSTER_NAME); - cluster.setColo("default"); - getStore().publish(EntityType.CLUSTER, cluster); - return cluster; - - } - - private Feed publishFeed(Cluster cluster, String frequency, String start, String end) - throws FalconException, ParseException { - Feed feed = new Feed(); - feed.setName(FEED_NAME); - Frequency f = new Frequency(frequency); - feed.setFrequency(f); - feed.setTimezone(UTC); - Clusters fClusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); - fCluster.setType(ClusterType.SOURCE); - fCluster.setName(cluster.getName()); - fCluster.setValidity(getFeedValidity(start, end)); - fClusters.getClusters().add(fCluster); - feed.setClusters(fClusters); - getStore().publish(EntityType.FEED, feed); - return feed; - } - - private Validity getFeedValidity(String start, String end) throws ParseException { - Validity validity = new Validity(); - validity.setStart(getDate(start)); - validity.setEnd(getDate(end)); - return validity; - } - - private Date getDate(String dateString) throws ParseException { - DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); - return format.parse(dateString); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/prism/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties index d72dbba..5258b96 100644 --- a/prism/src/test/resources/startup.properties +++ b/prism/src/test/resources/startup.properties @@ -73,7 +73,7 @@ # org.apache.falcon.state.store.jdbc.JdbcStateStore ## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here -*.feedAlert.listeners= +*.entityAlert.listeners= ##### JMS MQ Broker Implementation class ##### *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory @@ -97,7 +97,7 @@ #Configurations used in UTs debug.config.store.uri=file://${user.dir}/target/store #Location to store state of Feed SLA monitoring service -debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances +debug.entity.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingentityinstances debug.config.oozie.conf.uri=${user.dir}/target/oozie debug.system.lib.location=${system.lib.location} debug.broker.url=vm://localhost @@ -131,13 +131,13 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Do not change unless really sure # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 -*.feed.sla.statusCheck.frequency.seconds=600 +*.entity.sla.statusCheck.frequency.seconds=600 # Do not change unless really sure # Time Duration (in milliseconds) in future for generating pending feed instances. # In every cycle pending feed instances are added for monitoring, till this time in future. # It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 -*.feed.sla.lookAheadWindow.millis=900000 +*.entity.sla.lookAheadWindow.millis=900000 ######### Properties for configuring JMS provider - activemq ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index b663f04..6d82516 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -137,7 +137,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.extension.store.uri=file://${falcon.home}/extensions/ #Location to store state of Feed SLA monitoring service -*.feed.sla.service.store.uri = file://${falcon.home}/data/sla/pendingfeedinstances +*.entity.sla.service.store.uri = file://${falcon.home}/data/sla/pendingentityinstances # Location of libraries that is shipped to Hadoop *.system.lib.location=${falcon.home}/server/webapp/${falcon.app.type}/WEB-INF/lib @@ -152,7 +152,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Default timeout in minutes to load entities *.config.store.start.timeout.minutes=30 -######### Properties for Feed SLA Monitoring ######### +######### Properties for Entity SLA Monitoring ######### # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour *.feed.sla.serialization.frequency.millis=3600000 @@ -162,16 +162,16 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Do not change unless really sure # Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 -*.feed.sla.statusCheck.frequency.seconds=600 +*.entity.sla.statusCheck.frequency.seconds=600 # Do not change unless really sure # Time Duration (in milliseconds) in future for generating pending feed instances. # In every cycle pending feed instances are added for monitoring, till this time in future. # It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 -*.feed.sla.lookAheadWindow.millis=900000 +*.entity.sla.lookAheadWindow.millis=900000 ##Add if you want to enable BacklogMetricService -#*.feedAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService +#*.entityAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url http://git-wip-us.apache.org/repos/asf/falcon/blob/91c0a992/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java index 657ef9e..5525207 100644 --- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java +++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java @@ -168,8 +168,8 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @GET @Path("sla-alert/{type}") @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML}) - @Monitored(event = "feed-sla-misses") - public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts( + @Monitored(event = "entity-sla-misses") + public SchedulableEntityInstanceResult getEntitySLAMissPendingAlerts( @Dimension("entityType") @PathParam("type") String entityType, @Dimension("entityName") @QueryParam("name") String entityName, @Dimension("start") @QueryParam("start") String start, @@ -177,7 +177,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager { @Dimension("colo") @QueryParam("colo") final String colo) { try { validateSlaParams(entityType, entityName, start, end, colo); - return super.getFeedSLAMissPendingAlerts(entityName, start, end, colo); + return super.getEntitySLAMissPendingAlerts(entityName, entityType, start, end, colo); } catch (Throwable e) { throw FalconWebException.newAPIException(e); }
