http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
new file mode 100644
index 0000000..f931625
--- /dev/null
+++ 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import java.text.ParseException;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.MonitoredEntityBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.falcon.entity.v0.process.Process;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Service to monitor Feed SLAs.
+ */
+public final class EntitySLAMonitoringService implements 
ConfigurationChangeListener, FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
+
+    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE 
= new MonitoringJdbcStateStore();
+
+    private static final int ONE_MS = 1;
+
+    private static final EntitySLAMonitoringService SERVICE = new 
EntitySLAMonitoringService();
+
+    public static final String TAG_CRITICAL = "Missed-SLA-High";
+    public static final String TAG_WARN = "Missed-SLA-Low";
+
+    private EntitySLAMonitoringService() {
+
+    }
+
+    public static EntitySLAMonitoringService get() {
+        return SERVICE;
+    }
+
+    /**
+     * Permissions for storePath.
+     */
+    private static final FsPermission STORE_PERMISSION = new 
FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+
+    /**
+     * Frequency in seconds of "status check" for pending feed instances.
+     */
+    private int statusCheckFrequencySeconds; // 10 minutes
+
+
+    /**
+     * Time Duration (in milliseconds) in future for generating pending feed 
instances.
+     *
+     * In every cycle pending feed instances are added for monitoring, till 
this time in future.
+     */
+    private int lookAheadWindowMillis; // 15 MINUTES
+
+
+    /**
+     * Filesystem used for serializing and deserializing.
+     */
+    private FileSystem fileSystem;
+
+    /**
+     * Working directory for the feed sla monitoring service.
+     */
+    private Path storePath;
+
+    /**
+     * Path to store the state of the monitoring service.
+     */
+    private Path filePath;
+
+    @Override
+    public void onAdd(Entity entity) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null || feed.getSla() != null || 
checkFeedClusterSLA(feed)) {
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        if (FeedHelper.getSLA(cluster, feed) != null) {
+                            LOG.debug("Adding feed:{} for monitoring", 
feed.getName());
+                            
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), 
EntityType.FEED.toString());
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        if (entity.getEntityType() == EntityType.PROCESS){
+            Process process = (Process) entity;
+            if (process.getSla() != null || checkProcessClusterSLA(process)){
+                for (org.apache.falcon.entity.v0.process.Cluster  cluster : 
process.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        LOG.debug("Adding process:{} for monitoring", 
process.getName());
+                        
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
+                                EntityType.PROCESS.toString());
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    public Boolean checkFeedClusterSLA(Feed feed){
+        for(Cluster  cluster : feed.getClusters().getClusters()){
+            Sla sla =  FeedHelper.getSLA(cluster, feed);
+            if (sla != null){
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    public Boolean checkProcessClusterSLA(Process process){
+        Clusters clusters = process.getClusters();
+        for(org.apache.falcon.entity.v0.process.Cluster  cluster : 
clusters.getClusters()){
+            org.apache.falcon.entity.v0.process.Sla sla =  
ProcessHelper.getSLA(cluster, process);
+            if (sla != null){
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void onRemove(Entity entity) throws FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        if (entity.getEntityType() == EntityType.FEED) {
+            Feed feed = (Feed) entity;
+            // currently sla service is enabled only for fileSystemStorage
+            if (feed.getLocations() != null) {
+                for (Cluster cluster : feed.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
+                        
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), 
EntityType.FEED.toString());
+                        
MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), 
cluster.getName(),
+                                EntityType.FEED.toString());
+                    }
+                }
+            }
+        }
+        if (entity.getEntityType() == EntityType.PROCESS){
+            Process process = (Process) entity;
+            if (process.getSla() != null){
+                for (org.apache.falcon.entity.v0.process.Cluster  cluster : 
process.getClusters().getClusters()) {
+                    if (currentClusters.contains(cluster.getName())) {
+                        
MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
+                                EntityType.PROCESS.toString());
+                        
MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), 
cluster.getName(),
+                                EntityType.PROCESS.toString());
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
+        if (feed.getLocations() != null) {
+            Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+            for (Cluster cluster : feed.getClusters().getClusters()) {
+                if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isSLAMonitoringEnabledInCurrentColo(Process process) {
+
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        for (org.apache.falcon.entity.v0.process.Cluster  cluster : 
process.getClusters().getClusters()) {
+            if (currentClusters.contains(cluster.getName()) && 
ProcessHelper.getSLA(cluster, process) != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws 
FalconException {
+        if (newEntity.getEntityType() == EntityType.FEED) {
+            Feed oldFeed = (Feed) oldEntity;
+            Feed newFeed = (Feed) newEntity;
+            if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
+                onRemove(oldFeed);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
+                onAdd(newFeed);
+            } else {
+                List<String> slaRemovedClusters = new ArrayList<>();
+                for (String oldCluster : 
EntityUtil.getClustersDefinedInColos(oldFeed)) {
+                    if (FeedHelper.getSLA(oldCluster, oldFeed) != null
+                        && FeedHelper.getSLA(oldCluster, newFeed) == null) {
+                        slaRemovedClusters.add(oldCluster);
+                    }
+                }
+                updatePendingInstances(newFeed.getName(), slaRemovedClusters, 
EntityType.FEED.toString());
+            }
+        }
+        if (newEntity.getEntityType() == EntityType.PROCESS) {
+            Process oldProcess = (Process) oldEntity;
+            Process newProcess = (Process) newEntity;
+            if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
+                onRemove(newProcess);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
+                onAdd(newProcess);
+            } else {
+                List<String> slaRemovedClusters = new ArrayList<>();
+                for (String oldCluster : 
EntityUtil.getClustersDefined(oldProcess)){
+                    if (ProcessHelper.getSLA(oldCluster, oldProcess) != null
+                            && ProcessHelper.getSLA(oldCluster, newProcess) == 
null){
+                        slaRemovedClusters.add(oldCluster);
+                    }
+                }
+                updatePendingInstances(newProcess.getName(), 
slaRemovedClusters, EntityType.PROCESS.toString());
+            }
+        }
+    }
+
+    void updatePendingInstances(String entityName, List<String> 
slaRemovedClusters , String entityType){
+        for(String clusterName :slaRemovedClusters){
+            MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, 
clusterName,
+                    entityType);
+        }
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        onAdd(entity);
+    }
+
+    @Override
+    public String getName() {
+        return EntitySLAMonitoringService.class.getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String uri = 
StartupProperties.get().getProperty("feed.sla.service.store.uri");
+        storePath = new Path(uri);
+        filePath = new Path(storePath, "feedSLAMonitoringService");
+        fileSystem = initializeFileSystem();
+
+        String freq = 
StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", 
"600");
+        statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+        freq = 
StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", 
"900000");
+        lookAheadWindowMillis = Integer.parseInt(freq);
+        LOG.debug("No old state exists at: {}, Initializing a clean state.", 
filePath.toString());
+        ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, 
statusCheckFrequencySeconds, TimeUnit.SECONDS);
+    }
+
+    public void makeFeedInstanceAvailable(String feedName, String clusterName, 
Date nominalTime)
+        throws FalconException {
+        LOG.info("Removing {} feed's instance {} in cluster {} from 
pendingSLA", feedName,
+                clusterName, nominalTime);
+        List<Date> instances = 
(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName,
+                EntityType.FEED.toString()));
+        // Slas for feeds not having sla tag are not stored.
+        if (CollectionUtils.isEmpty(instances)){
+            MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, 
clusterName, nominalTime,
+                    EntityType.FEED.toString());
+        }
+    }
+
+    private FileSystem initializeFileSystem() {
+        try {
+            fileSystem = 
HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+            if (!fileSystem.exists(storePath)) {
+                LOG.info("Creating directory for pending feed instances: {}", 
storePath);
+                // set permissions so config store dir is owned by falcon alone
+                HadoopClientFactory.mkdirs(fileSystem, storePath, 
STORE_PERMISSION);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to bring up feed sla store for 
path: " + storePath, e);
+        }
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+    }
+
+    //Periodically update status of pending instances, add new instances and 
take backup.
+    private class Monitor implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 
0) {
+                    
checkPendingInstanceAvailability(EntityType.FEED.toString());
+                    
checkPendingInstanceAvailability(EntityType.PROCESS.toString());
+
+                    // add Instances from last checked time to 10 minutes from 
now(some buffer for status check)
+                    Date now = new Date();
+                    Date newCheckPoint = new Date(now.getTime() + 
lookAheadWindowMillis);
+                    addNewPendingFeedInstances(newCheckPoint, 
EntityType.FEED.toString());
+                    addNewPendingFeedInstances(newCheckPoint, 
EntityType.PROCESS.toString());
+                }
+            } catch (Throwable e) {
+                LOG.error("Feed SLA monitoring failed: ", e);
+            }
+        }
+    }
+
+
+    void addNewPendingFeedInstances(Date to, String entityType) throws 
FalconException {
+        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        List<MonitoredEntityBean> feedsBeanList = 
MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
+        for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) {
+            String entityName = monitoredEntityBean.getFeedName();
+            Entity entity = EntityUtil.getEntity(entityType, entityName);
+            Set<String> clusters =  EntityUtil.getClustersDefined(entity);
+            List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new 
ArrayList();
+            for(String string : clusters){
+                cluster.add(ClusterHelper.getCluster(string));
+            }
+            for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : 
cluster) {
+                if (currentClusters.contains(feedCluster.getName())) {
+                    // get start of instances from the database
+                    Date nextInstanceTime = 
MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
+                            EntityType.FEED.toString());
+                    Pair<String, String> key = new Pair<>(entity.getName(), 
feedCluster.getName());
+                    if (nextInstanceTime == null) {
+                        nextInstanceTime = getInitialStartTime(entity, 
feedCluster.getName(), entityType);
+                    } else {
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
+                    }
+
+                    Set<Date> instances = new HashSet<>();
+                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster 
=
+                            EntityUtil.getEntity(EntityType.CLUSTER, 
feedCluster.getName());
+                    nextInstanceTime = EntityUtil.getNextStartTime(entity, 
currentCluster, nextInstanceTime);
+                    Date endDate;
+                    if (entityType.equals(EntityType.FEED.toString())){
+                        endDate =  FeedHelper.getClusterValidity((Feed) 
entity, currentCluster.getName()).getEnd();
+                    }else {
+                        endDate =  ProcessHelper.getClusterValidity((Process) 
entity,
+                                currentCluster.getName()).getEnd();
+                    }
+                    while (nextInstanceTime.before(to) && 
nextInstanceTime.before(endDate)) {
+                        LOG.debug("Adding instance={} for <feed,cluster>={}", 
nextInstanceTime, key);
+                        instances.add(nextInstanceTime);
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
+                        nextInstanceTime = EntityUtil.getNextStartTime(entity, 
currentCluster, nextInstanceTime);
+                    }
+
+                    for(Date date:instances){
+                        
MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), 
feedCluster.getName(), date,
+                                entityType);
+                    }
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Checks the availability of all the pendingInstances and removes the 
ones which have become available.
+     */
+    private void checkPendingInstanceAvailability(String entityType) throws 
FalconException {
+        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            for (Date date : 
MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+                    pendingInstanceBean.getClusterName(), entityType)) {
+                boolean status = 
checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
+                        pendingInstanceBean.getClusterName(), date, 
entityType);
+                if (status) {
+                    
MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
+                            pendingInstanceBean.getClusterName(), date, 
EntityType.FEED.toString());
+                }
+            }
+        }
+    }
+
+    // checks whether a given feed instance is available or not
+    private boolean checkEntityInstanceAvailability(String entityName, String 
clusterName, Date nominalTime,
+                                                    String entityType) throws
+        FalconException {
+        Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+        try {
+            if (entityType.equals(EntityType.PROCESS.toString())){
+                LOG.debug("Checking instance availability status for 
entity:{}, cluster:{}, "
+                        + "instanceTime:{}", entity.getName(), clusterName, 
nominalTime, entityType);
+                AbstractWorkflowEngine wfEngine = 
WorkflowEngineFactory.getWorkflowEngine();
+                InstancesResult instancesResult = wfEngine.getStatus(entity, 
nominalTime, nominalTime, null, null);
+                if 
(instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){
+                    LOG.debug("Entity instance(feed:{}, cluster:{}, 
instanceTime:{}) is available.", entity.getName(),
+                            clusterName, nominalTime);
+                    return true;
+                }
+                return false;
+            }
+            if (entityType.equals(EntityType.FEED.toString())){
+                LOG.debug("Checking instance availability status for feed:{}, 
cluster:{}, instanceTime:{}",
+                        entity.getName(), clusterName, nominalTime);
+
+                FeedInstanceStatus.AvailabilityStatus status = 
FeedHelper.getFeedInstanceStatus((Feed) entity,
+                        clusterName, nominalTime);
+                if 
(status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
+                        || 
status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
+                    LOG.debug("Feed instance(feed:{}, cluster:{}, 
instanceTime:{}) is available.", entity.getName(),
+                            clusterName, nominalTime);
+                    return true;
+                }
+            }
+        } catch (Throwable e) {
+            LOG.error("Couldn't find status for Entity:{}, cluster:{}, 
entityType{}", entityName, clusterName,
+                    entityType, e);
+        }
+        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not 
available.", entity.getName(),
+            clusterName, nominalTime);
+        return false;
+    }
+
+
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances 
between given time range which have missed
+     * slaLow or slaHigh.
+     *
+     * Only feeds which have defined sla in their definition are considered.
+     * Only the feed instances between the given time range are considered.
+     * Start time and end time are both inclusive.
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return Set of pending feed instances belonging to the given range 
which have missed SLA
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date 
start, Date end)
+        throws FalconException {
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            Pair<String, String> feedClusterPair = new 
Pair<>(pendingInstanceBean.getEntityName(),
+                    pendingInstanceBean.getClusterName());
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, 
feedClusterPair.first);
+            Cluster cluster = FeedHelper.getCluster(feed, 
feedClusterPair.second);
+            Sla sla = FeedHelper.getSLA(cluster, feed);
+            if (sla != null) {
+                Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, 
start, end,
+                    
MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+                        pendingInstanceBean.getClusterName(), 
EntityType.FEED.toString()));
+                for (Pair<Date, String> status : slaStatus) {
+                    SchedulableEntityInstance instance = new 
SchedulableEntityInstance(feedClusterPair.first,
+                        feedClusterPair.second, status.first, EntityType.FEED);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of 
a given feed between the given time range
+     * which missed sla.Only those instances are included which have missed 
either slaLow or slaHigh.
+     * @param entityName name of the feed
+     * @param clusterName cluster name
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return Pending feed instances of the given feed which belong to the 
given time range and have missed SLA.
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String 
entityName, String clusterName,
+                                          Date start, Date end, String 
entityType) throws FalconException {
+
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        List<Date> missingInstances = 
MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+                entityType);
+        if (missingInstances == null || 
!Arrays.asList(EntityType.FEED.toString(),
+                EntityType.PROCESS.toString()).contains(entityType)){
+            return result;
+        }
+        Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+        if (entityType.equals(EntityType.FEED.toString())) {
+            Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+
+            if (sla != null) {
+                Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, 
start, end, missingInstances);
+                for (Pair<Date, String> status : slaStatus){
+                    SchedulableEntityInstance instance = new 
SchedulableEntityInstance(entityName, clusterName,
+                                status.first, EntityType.FEED);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+            return result;
+        } else {
+            org.apache.falcon.entity.v0.process.Sla sla = 
ProcessHelper.getSLA(clusterName, (Process) entity);
+            if (sla != null){
+                Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, 
start, end, missingInstances);
+                for (Pair<Date, String> status : slaStatus){
+                    SchedulableEntityInstance instance = new 
SchedulableEntityInstance(entityName, clusterName,
+                            status.first, EntityType.PROCESS);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, 
List<Date> missingInstances)
+        throws FalconException {
+        Date now = new Date();
+        Frequency slaLow = sla.getSlaLow();
+        Frequency slaHigh = sla.getSlaHigh();
+        Set<Pair<Date, String>> result = new HashSet<>();
+        for (Date nominalTime : missingInstances) {
+            if (!nominalTime.before(start) && !nominalTime.after(end)) {
+                ExpressionHelper.setReferenceDate(nominalTime);
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), 
Long.class);
+                Long slaLowDuration = evaluator.evaluate(slaLow.toString(), 
Long.class);
+                Date slaCriticalTime = new Date(nominalTime.getTime() + 
slaHighDuration);
+                Date slaWarnTime = new Date(nominalTime.getTime() + 
slaLowDuration);
+                if (slaCriticalTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+                } else if (slaWarnTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_WARN));
+                }
+            }
+        }
+        return result;
+    }
+
+    Set<Pair<Date, String>> 
getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
+                                                Date end, List<Date> 
missingInstances) throws FalconException {
+        Date now = new Date();
+        Frequency slaHigh = sla.getShouldEndIn();
+        Set<Pair<Date, String>> result = new HashSet<>();
+        for (Date nominalTime : missingInstances) {
+            if (!nominalTime.before(start) && !nominalTime.after(end)) {
+                ExpressionHelper.setReferenceDate(nominalTime);
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), 
Long.class);
+                Date slaCriticalTime = new Date(nominalTime.getTime() + 
slaHighDuration);
+                if (slaCriticalTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+                }
+            }
+        }
+        return result;
+    }
+
+    @VisibleForTesting
+    Date getInitialStartTime(Entity entity, String clusterName, String 
entityType) throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString())){
+            Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+            if (sla == null) {
+                throw new IllegalStateException("InitialStartTime can not be 
determined as the feed: "
+                    + entity.getName() + " and cluster: " + clusterName + " 
does not have any sla");
+            }
+            Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, 
clusterName);
+            Frequency slaLow = sla.getSlaLow();
+            Date slaTime = new Date(DateUtil.now().getTime() - 
DateUtil.getFrequencyInMillis(slaLow));
+            return startTime.before(slaTime) ? startTime : slaTime;
+        } else{
+            org.apache.falcon.entity.v0.process.Sla sla = 
ProcessHelper.getSLA(clusterName, (Process)entity);
+            if (sla == null) {
+                throw new IllegalStateException("InitialStartTime can not be 
determined as the feed: "
+                        + entity.getName() + " and cluster: " + clusterName + 
" does not have any sla");
+            }
+            Date startTime = ProcessHelper.getProcessValidityStart((Process) 
entity, clusterName);
+            Frequency slaLow = sla.getShouldEndIn();
+            Date slaTime = new Date(DateUtil.now().getTime() - 
DateUtil.getFrequencyInMillis(slaLow));
+            return startTime.before(slaTime) ? startTime : slaTime;
+        }
+    }
+
+    public void makeProcessInstanceAvailable(String clusterName, String 
entityName, String date, String entityType) {
+        Date nominalTime = null;
+        try {
+            nominalTime = DateUtil.parseDateFalconTZ(date);
+        }catch (ParseException e){
+            LOG.error("Exception while translating the date:", e);
+        }
+        if (nominalTime!= null){
+            List<Date> instances = 
(MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+                entityType));
+            if (!CollectionUtils.isEmpty(instances)){
+                MONITORING_JDBC_STATE_STORE.deletePendingInstance(entityName, 
clusterName, nominalTime,
+                    entityType);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
deleted file mode 100644
index c09c7ae..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-
-import java.util.Date;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
-  * Service to know which all feeds have missed SLA.
-  */
-public final class FeedSLAAlertService implements FalconService, 
EntitySLAListener {
-
-    private static final String NAME = "FeedSLAAlertService";
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(FeedSLAAlertService.class);
-
-    private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-
-    private Set<EntitySLAListener> listeners = new 
LinkedHashSet<EntitySLAListener>();
-
-    private static final FeedSLAAlertService SERVICE = new 
FeedSLAAlertService();
-
-    public static FeedSLAAlertService get() {
-        return SERVICE;
-    }
-
-    private FeedSLAAlertService(){}
-
-
-    @Override
-    public String getName() {
-        return NAME;
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String listenerClassNames = StartupProperties.get().
-                getProperty("feedAlert.listeners");
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
-            }
-            EntitySLAListener listener = 
ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
-        }
-
-        String freq = 
StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", 
"600");
-        int statusCheckFrequencySeconds = Integer.parseInt(freq);
-
-        ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), 0, 
statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
-    }
-
-    public void registerListener(EntitySLAListener listener) {
-        listeners.add(listener);
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-
-    }
-
-
-    private class Monitor implements Runnable {
-
-        @Override
-        public void run() {
-            processSLACandidates();
-        }
-    }
-
-    void processSLACandidates(){
-        //Get all feeds instances to be monitored
-        List<PendingInstanceBean> pendingInstanceBeanList = 
store.getAllInstances();
-        if (pendingInstanceBeanList.isEmpty()){
-            return;
-        }
-
-        LOG.debug("In processSLACandidates :" + 
pendingInstanceBeanList.size());
-        try{
-            for (PendingInstanceBean pendingInstanceBean : 
pendingInstanceBeanList) {
-
-                String feedName = pendingInstanceBean.getFeedName();
-                String clusterName = pendingInstanceBean.getClusterName();
-                Date nominalTime = pendingInstanceBean.getNominalTime();
-                Feed feed = ConfigurationStore.get().get(EntityType.FEED, 
feedName);
-
-                Cluster cluster =  FeedHelper.getCluster(feed, clusterName);
-
-                Set<SchedulableEntityInstance> schedulableEntityInstances= 
FeedSLAMonitoringService.get().
-                        getFeedSLAMissPendingAlerts(feed.getName(), 
cluster.getName(), nominalTime, nominalTime);
-                if (schedulableEntityInstances.isEmpty()){
-                    store.deleteFeedAlertInstance(feed.getName(), 
cluster.getName(), nominalTime);
-                    return;
-                }
-                List<SchedulableEntityInstance> schedulableEntityList = new 
ArrayList<>(schedulableEntityInstances);
-                SchedulableEntityInstance schedulableEntityInstance = 
schedulableEntityList.get(0);
-
-
-                if 
(schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_WARN))
 {
-                    store.putSLAAlertInstance(feedName, clusterName, 
nominalTime, true, false);
-                    //Mark in DB as SLA missed
-                    LOG.info("Feed :"+ feedName
-                                + "Cluster:" + clusterName + "Nominal Time:" + 
nominalTime + "missed SLALow");
-                } else if 
(schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_CRITICAL)){
-                    store.updateSLAAlertInstance(feedName, clusterName, 
nominalTime);
-                    LOG.info("Feed :"+ feedName
-                            + "Cluster:" + clusterName + "Nominal Time:" + 
nominalTime + "missed SLAHigh");
-                    highSLAMissed(feedName, EntityType.FEED, clusterName, 
nominalTime);
-                }
-            }
-        } catch (FalconException e){
-            LOG.error("Exception in FeedSLAALertService:", e);
-        }
-
-    }
-
-    @Override
-    public void highSLAMissed(String feedName, EntityType entityType, String 
clusterName, Date nominalTime)
-        throws FalconException{
-        for (EntitySLAListener listener : listeners) {
-            listener.highSLAMissed(feedName, entityType, clusterName, 
nominalTime);
-        }
-        store.deleteFeedAlertInstance(feedName, clusterName, nominalTime);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
deleted file mode 100644
index ed7bb08..0000000
--- 
a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.FeedInstanceStatus;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Service to monitor Feed SLAs.
- */
-public final class FeedSLAMonitoringService implements 
ConfigurationChangeListener, FalconService {
-    private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
-
-    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE 
= new MonitoringJdbcStateStore();
-
-    private static final int ONE_MS = 1;
-
-    private static final FeedSLAMonitoringService SERVICE = new 
FeedSLAMonitoringService();
-
-    public static final String TAG_CRITICAL = "Missed-SLA-High";
-    public static final String TAG_WARN = "Missed-SLA-Low";
-
-    private FeedSLAMonitoringService() {
-
-    }
-
-    public static FeedSLAMonitoringService get() {
-        return SERVICE;
-    }
-
-    /**
-     * Permissions for storePath.
-     */
-    private static final FsPermission STORE_PERMISSION = new 
FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-
-    /**
-     * Frequency in seconds of "status check" for pending feed instances.
-     */
-    private int statusCheckFrequencySeconds; // 10 minutes
-
-
-    /**
-     * Time Duration (in milliseconds) in future for generating pending feed 
instances.
-     *
-     * In every cycle pending feed instances are added for monitoring, till 
this time in future.
-     */
-    private int lookAheadWindowMillis; // 15 MINUTES
-
-
-    /**
-     * Filesystem used for serializing and deserializing.
-     */
-    private FileSystem fileSystem;
-
-    /**
-     * Working directory for the feed sla monitoring service.
-     */
-    private Path storePath;
-
-    /**
-     * Path to store the state of the monitoring service.
-     */
-    private Path filePath;
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED) {
-            Feed feed = (Feed) entity;
-            // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null) {
-                Set<String> currentClusters = 
DeploymentUtil.getCurrentClusters();
-                for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName())) {
-                        if (FeedHelper.getSLA(cluster, feed) != null) {
-                            LOG.debug("Adding feed:{} for monitoring", 
feed.getName());
-                            
MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED) {
-            Feed feed = (Feed) entity;
-            // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null) {
-                Set<String> currentClusters = 
DeploymentUtil.getCurrentClusters();
-                for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
-                        
MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName());
-                        
MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), 
cluster.getName());
-                    }
-                }
-            }
-        }
-    }
-
-    private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
-        if (feed.getLocations() != null) {
-            Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-            for (Cluster cluster : feed.getClusters().getClusters()) {
-                if (currentClusters.contains(cluster.getName()) && 
FeedHelper.getSLA(cluster, feed) != null) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws 
FalconException {
-        if (newEntity.getEntityType() == EntityType.FEED) {
-            Feed oldFeed = (Feed) oldEntity;
-            Feed newFeed = (Feed) newEntity;
-            if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
-                onRemove(oldFeed);
-            } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
-                onAdd(newFeed);
-            } else {
-                List<String> slaRemovedClusters = new ArrayList<>();
-                for (String oldCluster : 
EntityUtil.getClustersDefinedInColos(oldFeed)) {
-                    if (FeedHelper.getSLA(oldCluster, oldFeed) != null
-                        && FeedHelper.getSLA(oldCluster, newFeed) == null) {
-                        slaRemovedClusters.add(oldCluster);
-                    }
-                }
-
-                for (String clusterName : slaRemovedClusters) {
-                    
MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), 
clusterName);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-    @Override
-    public String getName() {
-        return FeedSLAMonitoringService.class.getSimpleName();
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String uri = 
StartupProperties.get().getProperty("feed.sla.service.store.uri");
-        storePath = new Path(uri);
-        filePath = new Path(storePath, "feedSLAMonitoringService");
-        fileSystem = initializeFileSystem();
-
-        String freq = 
StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", 
"600");
-        statusCheckFrequencySeconds = Integer.parseInt(freq);
-
-        freq = 
StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", 
"900000");
-        lookAheadWindowMillis = Integer.parseInt(freq);
-        LOG.debug("No old state exists at: {}, Initializing a clean state.", 
filePath.toString());
-        ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), 0, 
statusCheckFrequencySeconds, TimeUnit.SECONDS);
-    }
-
-    public void makeFeedInstanceAvailable(String feedName, String clusterName, 
Date nominalTime)
-        throws FalconException {
-        LOG.info("Removing {} feed's instance {} in cluster {} from 
pendingSLA", feedName,
-                clusterName, nominalTime);
-        List<Date> instances = 
(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName));
-        // Slas for feeds not having sla tag are not stored.
-        if (CollectionUtils.isEmpty(instances)){
-            MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, 
clusterName, nominalTime);
-        }
-    }
-
-    private FileSystem initializeFileSystem() {
-        try {
-            fileSystem = 
HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
-            if (!fileSystem.exists(storePath)) {
-                LOG.info("Creating directory for pending feed instances: {}", 
storePath);
-                // set permissions so config store dir is owned by falcon alone
-                HadoopClientFactory.mkdirs(fileSystem, storePath, 
STORE_PERMISSION);
-            }
-            return fileSystem;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to bring up feed sla store for 
path: " + storePath, e);
-        }
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-    }
-
-    //Periodically update status of pending instances, add new instances and 
take backup.
-    private class Monitor implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 
0) {
-                    checkPendingInstanceAvailability();
-
-                    // add Instances from last checked time to 10 minutes from 
now(some buffer for status check)
-                    Date now = new Date();
-                    Date newCheckPoint = new Date(now.getTime() + 
lookAheadWindowMillis);
-                    addNewPendingFeedInstances(newCheckPoint);
-                }
-            } catch (Throwable e) {
-                LOG.error("Feed SLA monitoring failed: ", e);
-            }
-        }
-    }
-
-
-    void addNewPendingFeedInstances(Date to) throws FalconException {
-        Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        List<MonitoredFeedsBean> feedsBeanList = 
MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
-        for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
-            String feedName = monitoredFeedsBean.getFeedName();
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-            for (Cluster feedCluster : feed.getClusters().getClusters()) {
-                if (currentClusters.contains(feedCluster.getName())) {
-                    // get start of instances from the database
-                    Date nextInstanceTime = 
MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName);
-                    Pair<String, String> key = new Pair<>(feed.getName(), 
feedCluster.getName());
-                    if (nextInstanceTime == null) {
-                        nextInstanceTime = getInitialStartTime(feed, 
feedCluster.getName());
-                    } else {
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
-                    }
-
-                    Set<Date> instances = new HashSet<>();
-                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster 
=
-                            EntityUtil.getEntity(EntityType.CLUSTER, 
feedCluster.getName());
-                    nextInstanceTime = EntityUtil.getNextStartTime(feed, 
currentCluster, nextInstanceTime);
-                    Date endDate = FeedHelper.getClusterValidity(feed, 
currentCluster.getName()).getEnd();
-                    while (nextInstanceTime.before(to) && 
nextInstanceTime.before(endDate)) {
-                        LOG.debug("Adding instance={} for <feed,cluster>={}", 
nextInstanceTime, key);
-                        instances.add(nextInstanceTime);
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
-                        nextInstanceTime = EntityUtil.getNextStartTime(feed, 
currentCluster, nextInstanceTime);
-                    }
-
-                    for(Date date:instances){
-                        
MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), 
feedCluster.getName(), date);
-                    }
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Checks the availability of all the pendingInstances and removes the 
ones which have become available.
-     */
-    private void checkPendingInstanceAvailability() throws FalconException {
-        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
-            for (Date date : 
MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
-                    pendingInstanceBean.getClusterName())) {
-                boolean status = 
checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(),
-                        pendingInstanceBean.getClusterName(), date);
-                if (status) {
-                    
MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(),
-                            pendingInstanceBean.getClusterName(), date);
-                }
-            }
-        }
-    }
-
-    // checks whether a given feed instance is available or not
-    private boolean checkFeedInstanceAvailability(String feedName, String 
clusterName, Date nominalTime) throws
-        FalconException {
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-
-        try {
-            LOG.debug("Checking instance availability status for feed:{}, 
cluster:{}, instanceTime:{}", feed.getName(),
-                    clusterName, nominalTime);
-            FeedInstanceStatus.AvailabilityStatus status = 
FeedHelper.getFeedInstanceStatus(feed, clusterName,
-                    nominalTime);
-            if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
-                    || 
status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
-                LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) 
is available.", feed.getName(),
-                    clusterName, nominalTime);
-                return true;
-            }
-        } catch (Throwable e) {
-            LOG.error("Couldn't find status for feed:{}, cluster:{}", 
feedName, clusterName, e);
-        }
-        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not 
available.", feed.getName(),
-            clusterName, nominalTime);
-        return false;
-    }
-
-
-    /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances 
between given time range which have missed
-     * slaLow or slaHigh.
-     *
-     * Only feeds which have defined sla in their definition are considered.
-     * Only the feed instances between the given time range are considered.
-     * Start time and end time are both inclusive.
-     * @param start start time, inclusive
-     * @param end end time, inclusive
-     * @return Set of pending feed instances belonging to the given range 
which have missed SLA
-     * @throws FalconException
-     */
-    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date 
start, Date end)
-        throws FalconException {
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
-            Pair<String, String> feedClusterPair = new 
Pair<>(pendingInstanceBean.getFeedName(),
-                    pendingInstanceBean.getClusterName());
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, 
feedClusterPair.first);
-            Cluster cluster = FeedHelper.getCluster(feed, 
feedClusterPair.second);
-            Sla sla = FeedHelper.getSLA(cluster, feed);
-            if (sla != null) {
-                Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, 
end,
-                    
MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
-                        pendingInstanceBean.getClusterName()));
-                for (Pair<Date, String> status : slaStatus) {
-                    SchedulableEntityInstance instance = new 
SchedulableEntityInstance(feedClusterPair.first,
-                        feedClusterPair.second, status.first, EntityType.FEED);
-                    instance.setTags(status.second);
-                    result.add(instance);
-                }
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of 
a given feed between the given time range
-     * which missed sla.Only those instances are included which have missed 
either slaLow or slaHigh.
-     * @param feedName name of the feed
-     * @param clusterName cluster name
-     * @param start start time, inclusive
-     * @param end end time, inclusive
-     * @return Pending feed instances of the given feed which belong to the 
given time range and have missed SLA.
-     * @throws FalconException
-     */
-    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String 
feedName, String clusterName,
-                                                                 Date start, 
Date end) throws FalconException {
-
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        Pair<String, String> feedClusterPair = new Pair<>(feedName, 
clusterName);
-        List<Date> missingInstances = 
MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName);
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-        Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
-        Sla sla = FeedHelper.getSLA(cluster, feed);
-        if (missingInstances != null && sla != null) {
-            Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, 
missingInstances);
-            for (Pair<Date, String> status : slaStatus){
-                SchedulableEntityInstance instance = new 
SchedulableEntityInstance(feedName, clusterName, status.first,
-                        EntityType.FEED);
-                instance.setTags(status.second);
-                result.add(instance);
-            }
-        }
-        return result;
-    }
-
-    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, 
List<Date> missingInstances)
-        throws FalconException {
-        Date now = new Date();
-        Frequency slaLow = sla.getSlaLow();
-        Frequency slaHigh = sla.getSlaHigh();
-        Set<Pair<Date, String>> result = new HashSet<>();
-        for (Date nominalTime : missingInstances) {
-            if (!nominalTime.before(start) && !nominalTime.after(end)) {
-                ExpressionHelper.setReferenceDate(nominalTime);
-                ExpressionHelper evaluator = ExpressionHelper.get();
-                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), 
Long.class);
-                Long slaLowDuration = evaluator.evaluate(slaLow.toString(), 
Long.class);
-                Date slaCriticalTime = new Date(nominalTime.getTime() + 
slaHighDuration);
-                Date slaWarnTime = new Date(nominalTime.getTime() + 
slaLowDuration);
-                if (slaCriticalTime.before(now)) {
-                    result.add(new Pair<>(nominalTime, TAG_CRITICAL));
-                } else if (slaWarnTime.before(now)) {
-                    result.add(new Pair<>(nominalTime, TAG_WARN));
-                }
-            }
-        }
-        return result;
-    }
-
-    @VisibleForTesting
-    Date getInitialStartTime(Feed feed, String clusterName) throws 
FalconException {
-        Sla sla = FeedHelper.getSLA(clusterName, feed);
-        if (sla == null) {
-            throw new IllegalStateException("InitialStartTime can not be 
determined as the feed: "
-                + feed.getName() + " and cluster: " + clusterName + " does not 
have any sla");
-        }
-        Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName);
-        Frequency slaLow = sla.getSlaLow();
-        Date slaTime = new Date(DateUtil.now().getTime() - 
DateUtil.getFrequencyInMillis(slaLow));
-        return startTime.before(slaTime) ? startTime : slaTime;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java 
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index a4a95be..8cf2b2d 100644
--- 
a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ 
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -25,6 +25,7 @@ import javax.persistence.Query;
 
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.tools.FalconStateStoreDBCLI;
@@ -85,40 +86,44 @@ public class MonitoringJdbcStateStoreTest extends 
AbstractTestBase {
 
     @Test
     public void testInsertRetrieveAndUpdate() throws Exception {
-        monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
-        monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
-        Assert.assertEquals("test_feed1", 
monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed1", 
EntityType.FEED.toString());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed2", 
EntityType.FEED.toString());
+        Assert.assertEquals("test_feed1", 
monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
+                EntityType.FEED.toString()).getFeedName());
         
Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
 
-        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1");
-        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2");
+        monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", 
EntityType.FEED.toString());
+        monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", 
EntityType.FEED.toString());
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
         Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
-        monitoringJdbcStateStore.putPendingInstances("test_feed1", 
"test_cluster", dateOne);
-        monitoringJdbcStateStore.putPendingInstances("test_feed1", 
"test_cluster", dateTwo);
-
-        
Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", 
"test_cluster").size(), 2);
-        monitoringJdbcStateStore.deletePendingInstance("test_feed1", 
"test_cluster", dateOne);
-        
Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", 
"test_cluster").size(), 1);
-        monitoringJdbcStateStore.deletePendingInstances("test_feed1", 
"test_cluster");
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", 
"test_cluster", dateOne, EntityType.FEED.toString());
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", 
"test_cluster", dateTwo, EntityType.FEED.toString());
+
+        
Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", 
"test_cluster",
+                EntityType.FEED.toString()).size(), 2);
+        monitoringJdbcStateStore.deletePendingInstance("test_feed1", 
"test_cluster", dateOne,
+                EntityType.FEED.toString());
+        
Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", 
"test_cluster",
+                EntityType.FEED.toString()).size(), 1);
+        monitoringJdbcStateStore.deletePendingInstances("test_feed1", 
"test_cluster", EntityType.FEED.toString());
     }
 
     @Test
     public void testEmptyLatestInstance() throws Exception {
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-        store.putMonitoredFeed("test-feed1");
-        store.putMonitoredFeed("test-feed2");
-        Assert.assertNull(store.getLastInstanceTime("test-feed1"));
+        store.putMonitoredEntity("test-feed1", EntityType.FEED.toString());
+        store.putMonitoredEntity("test-feed2", EntityType.FEED.toString());
+        Assert.assertNull(store.getLastInstanceTime("test-feed1", 
EntityType.FEED.toString()));
 
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
         Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
 
-        store.putPendingInstances("test-feed1", "test_cluster", dateTwo);
-        store.putPendingInstances("test-feed1", "test_cluster", dateOne);
-        store.putPendingInstances("test-feed2", "test_cluster", dateOne);
+        store.putPendingInstances("test-feed1", "test_cluster", dateTwo, 
EntityType.FEED.toString());
+        store.putPendingInstances("test-feed1", "test_cluster", dateOne, 
EntityType.FEED.toString());
+        store.putPendingInstances("test-feed2", "test_cluster", dateOne, 
EntityType.FEED.toString());
 
-        
Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1")));
-        
Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2")));
+        
Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1", 
EntityType.FEED.toString())));
+        
Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2", 
EntityType.FEED.toString())));
 
     }
 
@@ -126,14 +131,15 @@ public class MonitoringJdbcStateStoreTest extends 
AbstractTestBase {
     public void testputSLALowCandidate() throws Exception{
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
-        store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, 
Boolean.TRUE, Boolean.FALSE);
-        Assert.assertEquals(Boolean.TRUE, 
store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLALowMissed());
-        
Assert.assertTrue(dateOne.equals(store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getNominalTime()));
-        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
-        Assert.assertEquals(Boolean.TRUE, 
store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLAHighMissed());
+        store.putSLAAlertInstance("test-feed1", "test-cluster", 
EntityType.FEED.toString(),
+                dateOne, Boolean.TRUE, Boolean.FALSE);
+        Assert.assertEquals(Boolean.TRUE, 
store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, 
EntityType.FEED.toString()).getIsSLALowMissed());
+        
Assert.assertTrue(dateOne.equals(store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, 
EntityType.FEED.toString()).getNominalTime()));
+        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne, 
EntityType.FEED.toString());
+        Assert.assertEquals(Boolean.TRUE, 
store.getEntityAlertInstance("test-feed1",
+                "test-cluster", dateOne, 
EntityType.FEED.toString()).getIsSLAHighMissed());
     }
 
     @Test
@@ -141,21 +147,22 @@ public class MonitoringJdbcStateStoreTest extends 
AbstractTestBase {
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
 
-        store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, 
Boolean.TRUE, Boolean.FALSE);
-        store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
-        Assert.assertEquals(Boolean.TRUE, 
store.getFeedAlertInstance("test-feed1",
-                "test-cluster", dateOne).getIsSLAHighMissed());
+        store.putSLAAlertInstance("test-process", "test-cluster", 
EntityType.PROCESS.toString(),
+                dateOne, Boolean.TRUE, Boolean.FALSE);
+        store.updateSLAAlertInstance("test-process", "test-cluster", dateOne, 
EntityType.PROCESS.toString());
+        Assert.assertEquals(Boolean.TRUE, 
store.getEntityAlertInstance("test-process",
+                "test-cluster", dateOne, 
EntityType.PROCESS.toString()).getIsSLAHighMissed());
     }
 
     private void clear() {
         EntityManager em = FalconJPAService.get().getEntityManager();
         em.getTransaction().begin();
         try {
-            Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
+            Query query = em.createNativeQuery("delete from 
PENDING_INSTANCES");
             query.executeUpdate();
-            query = em.createNativeQuery("delete from PENDING_INSTANCES");
+            query = em.createNativeQuery("delete from MONITORED_ENTITY");
             query.executeUpdate();
-            query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
+            query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
             query.executeUpdate();
 
         } finally {

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git 
a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java 
b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
new file mode 100644
index 0000000..c8b4f5e
--- /dev/null
+++ 
b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.File;
+import java.util.Date;
+
+/**
+ * Test for EntitySLAMonitoringService.
+ */
+public class EntitySLAAlertServiceTest extends AbstractTestBase {
+    private static final String DB_BASE_DIR = "target/test-data/persistancedb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + 
"data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + 
"out.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    private static MonitoringJdbcStateStore monitoringJdbcStateStore;
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+
+    public void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", 
sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    @BeforeClass
+    public void setup() throws Exception{
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+    }
+
+    @BeforeMethod
+    public void init() {
+        clear();
+    }
+
+    private void clear() {
+        EntityManager em = FalconJPAService.get().getEntityManager();
+        em.getTransaction().begin();
+        try {
+            Query query = em.createNativeQuery("delete from 
PENDING_INSTANCES");
+            query.executeUpdate();
+            query = em.createNativeQuery("delete from MONITORED_ENTITY");
+            query.executeUpdate();
+            query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
+            query.executeUpdate();
+
+        } finally {
+            em.getTransaction().commit();
+            em.close();
+        }
+    }
+
+    @Test
+    public static void processSLALowCandidates() throws FalconException, 
InterruptedException{
+
+        Date dateOne =  new Date(System.currentTimeMillis()-100000);
+        monitoringJdbcStateStore.putPendingInstances("test-feed", 
"test-cluster", dateOne, EntityType.FEED.toString());
+        org.apache.falcon.entity.v0.feed.Clusters cluster = new 
org.apache.falcon.entity.v0.feed.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new 
org.apache.falcon.entity.v0.cluster.Cluster();
+        cluster1.setName("test-cluster");
+        Cluster testCluster = new Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Feed mockEntity = new Feed();
+        mockEntity.setName("test-feed");
+        mockEntity.setClusters(cluster);
+        cluster1.setColo("test-cluster");
+
+
+        if (ConfigurationStore.get().get(EntityType.FEED, 
mockEntity.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, 
cluster1.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+        }
+        Sla sla = new Sla();
+        Frequency frequencyLow = new Frequency("1", 
Frequency.TimeUnit.minutes);
+        Frequency frequencyHigh = new Frequency("2", 
Frequency.TimeUnit.minutes);
+        sla.setSlaLow(frequencyLow);
+        sla.setSlaHigh(frequencyHigh);
+        mockEntity.setSla(sla);
+
+        EntitySLAAlertService.get().init();
+        Thread.sleep(10*1000);
+        
Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", 
"test-cluster",
+                dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
+    }
+
+    @Test
+    public static void processSLACandidateProcess() throws FalconException, 
InterruptedException{
+        Date dateOne =  new Date(System.currentTimeMillis()-130000);
+
+        monitoringJdbcStateStore.putPendingInstances("test-process", 
"test-cluster", dateOne,
+                EntityType.PROCESS.name());
+        EntitySLAAlertService.get().init();
+        org.apache.falcon.entity.v0.process.Clusters cluster = new 
org.apache.falcon.entity.v0.process.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster processCluster = new 
org.apache.falcon.entity.v0.cluster.Cluster();
+        processCluster.setName("test-cluster");
+        org.apache.falcon.entity.v0.process.Cluster testCluster = new 
org.apache.falcon.entity.v0.process.Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Process process =  new Process();
+        process.setName("test-process");
+        process.setClusters(cluster);
+        processCluster.setColo("test-cluster");
+
+        if (ConfigurationStore.get().get(EntityType.PROCESS, 
process.getName()) == null){
+            ConfigurationStore.get().publish(EntityType.PROCESS, process);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, 
processCluster.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, 
processCluster);
+        }
+        org.apache.falcon.entity.v0.process.Sla sla = new 
org.apache.falcon.entity.v0.process.Sla();
+        Frequency processFrequency = new Frequency("1", 
Frequency.TimeUnit.minutes);
+        sla.setShouldEndIn(processFrequency);
+        process.setSla(sla);
+
+
+        Thread.sleep(10*1000);
+        
Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-process",
 "test-cluster", dateOne,
+                EntityType.PROCESS.name()).getIsSLAHighMissed());
+
+    }
+
+    @Test(expectedExceptions = javax.persistence.NoResultException.class)
+    public static void processSLAHighCandidates() throws FalconException, 
InterruptedException{
+
+        Date dateOne =  new Date(System.currentTimeMillis()-130000);
+        monitoringJdbcStateStore.putPendingInstances("test-feed", 
"test-cluster", dateOne, EntityType.FEED.toString());
+        org.apache.falcon.entity.v0.feed.Clusters cluster = new 
org.apache.falcon.entity.v0.feed.Clusters();
+        org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new 
org.apache.falcon.entity.v0.cluster.Cluster();
+        cluster1.setName("test-cluster");
+        Cluster testCluster = new Cluster();
+        testCluster.setName("test-cluster");
+        cluster.getClusters().add(testCluster);
+        Feed mockEntity = new Feed();
+        mockEntity.setName("test-feed");
+        mockEntity.setClusters(cluster);
+        cluster1.setColo("test-cluster");
+        if (ConfigurationStore.get().get(EntityType.FEED, 
mockEntity.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+        }
+        if (ConfigurationStore.get().get(EntityType.CLUSTER, 
cluster1.getName()) == null) {
+            ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+        }
+        Sla sla = new Sla();
+        Frequency frequencyLow = new Frequency("1", 
Frequency.TimeUnit.minutes);
+        Frequency frequencyHigh = new Frequency("2", 
Frequency.TimeUnit.minutes);
+        sla.setSlaLow(frequencyLow);
+        sla.setSlaHigh(frequencyHigh);
+        mockEntity.setSla(sla);
+
+        EntitySLAAlertService.get().init();
+        Thread.sleep(10*1000);
+        
Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", 
"test-cluster",
+                dateOne, EntityType.FEED.toString()).getIsSLAHighMissed());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git 
a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java 
b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
deleted file mode 100644
index 7c886c1..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.tools.FalconStateStoreDBCLI;
-import org.apache.falcon.util.StateStoreProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-import java.io.File;
-import java.util.Date;
-
-/**
- * Test for FeedSLAMonitoringService.
- */
-public class FeedSLAAlertServiceTest extends AbstractTestBase {
-    private static final String DB_BASE_DIR = "target/test-data/persistancedb";
-    protected static String dbLocation = DB_BASE_DIR + File.separator + 
"data.db";
-    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
-    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + 
"out.sql";
-    protected LocalFileSystem fs = new LocalFileSystem();
-
-    private static MonitoringJdbcStateStore monitoringJdbcStateStore;
-    private static FalconJPAService falconJPAService = FalconJPAService.get();
-
-    protected int execDBCLICommands(String[] args) {
-        return new FalconStateStoreDBCLI().run(args);
-    }
-
-    public void createDB(String file) {
-        File sqlFile = new File(file);
-        String[] argsCreate = { "create", "-sqlfile", 
sqlFile.getAbsolutePath(), "-run" };
-        int result = execDBCLICommands(argsCreate);
-        Assert.assertEquals(0, result);
-        Assert.assertTrue(sqlFile.exists());
-
-    }
-
-    @BeforeClass
-    public void setup() throws Exception{
-        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
-        Configuration localConf = new Configuration();
-        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
-        fs.mkdirs(new Path(DB_BASE_DIR));
-        createDB(DB_SQL_FILE);
-        falconJPAService.init();
-        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
-        this.conf = dfsCluster.getConf();
-        monitoringJdbcStateStore = new MonitoringJdbcStateStore();
-    }
-
-    @BeforeMethod
-    public void init() {
-        clear();
-    }
-
-    private void clear() {
-        EntityManager em = FalconJPAService.get().getEntityManager();
-        em.getTransaction().begin();
-        try {
-            Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
-            query.executeUpdate();
-            query = em.createNativeQuery("delete from PENDING_INSTANCES");
-            query.executeUpdate();
-            query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
-            query.executeUpdate();
-
-        } finally {
-            em.getTransaction().commit();
-            em.close();
-        }
-    }
-
-    @Test
-    public static void processSLALowCandidates() throws FalconException, 
InterruptedException{
-
-        Date dateOne =  new Date(System.currentTimeMillis()-100000);
-        monitoringJdbcStateStore.putPendingInstances("test-feed", 
"test-cluster", dateOne);
-        org.apache.falcon.entity.v0.feed.Clusters cluster = new 
org.apache.falcon.entity.v0.feed.Clusters();
-        Cluster testCluster = new Cluster();
-        testCluster.setName("test-cluster");
-        cluster.getClusters().add(testCluster);
-        Feed mockEntity = new Feed();
-        mockEntity.setName("test-feed");
-        mockEntity.setClusters(cluster);
-        if (ConfigurationStore.get().get(EntityType.FEED, 
mockEntity.getName()) == null) {
-            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
-        }
-        Sla sla = new Sla();
-        Frequency frequencyLow = new Frequency("1", 
Frequency.TimeUnit.minutes);
-        Frequency frequencyHigh = new Frequency("2", 
Frequency.TimeUnit.minutes);
-        sla.setSlaLow(frequencyLow);
-        sla.setSlaHigh(frequencyHigh);
-        mockEntity.setSla(sla);
-
-        FeedSLAAlertService.get().init();
-        Thread.sleep(10*1000);
-        
Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", 
"test-cluster",
-                dateOne).getIsSLALowMissed());
-    }
-
-    @Test(expectedExceptions = javax.persistence.NoResultException.class)
-    public static void processSLAHighCandidates() throws FalconException, 
InterruptedException{
-
-        Date dateOne =  new Date(System.currentTimeMillis()-130000);
-        monitoringJdbcStateStore.putPendingInstances("test-feed", 
"test-cluster", dateOne);
-        org.apache.falcon.entity.v0.feed.Clusters cluster = new 
org.apache.falcon.entity.v0.feed.Clusters();
-        Cluster testCluster = new Cluster();
-        testCluster.setName("test-cluster");
-        cluster.getClusters().add(testCluster);
-        Feed mockEntity = new Feed();
-        mockEntity.setName("test-feed");
-        mockEntity.setClusters(cluster);
-        if (ConfigurationStore.get().get(EntityType.FEED, 
mockEntity.getName()) == null) {
-            ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
-        }
-        Sla sla = new Sla();
-        Frequency frequencyLow = new Frequency("1", 
Frequency.TimeUnit.minutes);
-        Frequency frequencyHigh = new Frequency("2", 
Frequency.TimeUnit.minutes);
-        sla.setSlaLow(frequencyLow);
-        sla.setSlaHigh(frequencyHigh);
-        mockEntity.setSla(sla);
-
-        FeedSLAAlertService.get().init();
-        Thread.sleep(10*1000);
-        
Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", 
"test-cluster",
-                dateOne).getIsSLAHighMissed());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java 
b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index 97cc459..9cf50c2 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -54,7 +54,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
     private static final String CLUSTER_NAME = "testCluster";
     private static final String FEED_NAME = "testFeed";
     private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-    private static final String TAG_CRITICAL = 
FeedSLAMonitoringService.get().TAG_CRITICAL;
+    private static final String TAG_CRITICAL = 
EntitySLAMonitoringService.get().TAG_CRITICAL;
 
     @Test
     public void testSLAStatus() throws FalconException {
@@ -74,7 +74,8 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
         missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); 
// equal to end time
         missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); 
// after end time
 
-        Set<Pair<Date, String>> result = 
FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances);
+        Set<Pair<Date, String>> result = 
EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end,
+                missingInstances);
         Set<Pair<Date, String>> expected = new HashSet<>();
         expected.add(new 
Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL));
         expected.add(new 
Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));

Reply via email to