http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java new file mode 100644 index 0000000..f931625 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -0,0 +1,644 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import java.text.ParseException; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; +import org.apache.falcon.entity.FeedInstanceStatus; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.process.Clusters; +import org.apache.falcon.expression.ExpressionHelper; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.persistence.MonitoredEntityBean; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.util.DateUtil; +import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowEngineFactory; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.falcon.entity.v0.process.Process; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Service to monitor Feed SLAs. + */ +public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService { + private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); + + private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); + + private static final int ONE_MS = 1; + + private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService(); + + public static final String TAG_CRITICAL = "Missed-SLA-High"; + public static final String TAG_WARN = "Missed-SLA-Low"; + + private EntitySLAMonitoringService() { + + } + + public static EntitySLAMonitoringService get() { + return SERVICE; + } + + /** + * Permissions for storePath. + */ + private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + + + /** + * Frequency in seconds of "status check" for pending feed instances. + */ + private int statusCheckFrequencySeconds; // 10 minutes + + + /** + * Time Duration (in milliseconds) in future for generating pending feed instances. + * + * In every cycle pending feed instances are added for monitoring, till this time in future. + */ + private int lookAheadWindowMillis; // 15 MINUTES + + + /** + * Filesystem used for serializing and deserializing. + */ + private FileSystem fileSystem; + + /** + * Working directory for the feed sla monitoring service. + */ + private Path storePath; + + /** + * Path to store the state of the monitoring service. + */ + private Path filePath; + + @Override + public void onAdd(Entity entity) throws FalconException { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + if (entity.getEntityType() == EntityType.FEED) { + Feed feed = (Feed) entity; + // currently sla service is enabled only for fileSystemStorage + if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) { + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName())) { + if (FeedHelper.getSLA(cluster, feed) != null) { + LOG.debug("Adding feed:{} for monitoring", feed.getName()); + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString()); + break; + } + } + } + } + } + if (entity.getEntityType() == EntityType.PROCESS){ + Process process = (Process) entity; + if (process.getSla() != null || checkProcessClusterSLA(process)){ + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName())) { + LOG.debug("Adding process:{} for monitoring", process.getName()); + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(), + EntityType.PROCESS.toString()); + break; + } + } + } + } + } + + public Boolean checkFeedClusterSLA(Feed feed){ + for(Cluster cluster : feed.getClusters().getClusters()){ + Sla sla = FeedHelper.getSLA(cluster, feed); + if (sla != null){ + return true; + } + } + return false; + } + + + public Boolean checkProcessClusterSLA(Process process){ + Clusters clusters = process.getClusters(); + for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){ + org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process); + if (sla != null){ + return true; + } + } + return false; + } + + @Override + public void onRemove(Entity entity) throws FalconException { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + if (entity.getEntityType() == EntityType.FEED) { + Feed feed = (Feed) entity; + // currently sla service is enabled only for fileSystemStorage + if (feed.getLocations() != null) { + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString()); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(), + EntityType.FEED.toString()); + } + } + } + } + if (entity.getEntityType() == EntityType.PROCESS){ + Process process = (Process) entity; + if (process.getSla() != null){ + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName())) { + MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(), + EntityType.PROCESS.toString()); + MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(), + EntityType.PROCESS.toString()); + } + } + } + } + } + + private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) { + if (feed.getLocations() != null) { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (Cluster cluster : feed.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + return true; + } + } + } + return false; + } + + private boolean isSLAMonitoringEnabledInCurrentColo(Process process) { + + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + if (currentClusters.contains(cluster.getName()) && ProcessHelper.getSLA(cluster, process) != null) { + return true; + } + } + return false; + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { + if (newEntity.getEntityType() == EntityType.FEED) { + Feed oldFeed = (Feed) oldEntity; + Feed newFeed = (Feed) newEntity; + if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) { + onRemove(oldFeed); + } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) { + onAdd(newFeed); + } else { + List<String> slaRemovedClusters = new ArrayList<>(); + for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) { + if (FeedHelper.getSLA(oldCluster, oldFeed) != null + && FeedHelper.getSLA(oldCluster, newFeed) == null) { + slaRemovedClusters.add(oldCluster); + } + } + updatePendingInstances(newFeed.getName(), slaRemovedClusters, EntityType.FEED.toString()); + } + } + if (newEntity.getEntityType() == EntityType.PROCESS) { + Process oldProcess = (Process) oldEntity; + Process newProcess = (Process) newEntity; + if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){ + onRemove(newProcess); + } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){ + onAdd(newProcess); + } else { + List<String> slaRemovedClusters = new ArrayList<>(); + for (String oldCluster : EntityUtil.getClustersDefined(oldProcess)){ + if (ProcessHelper.getSLA(oldCluster, oldProcess) != null + && ProcessHelper.getSLA(oldCluster, newProcess) == null){ + slaRemovedClusters.add(oldCluster); + } + } + updatePendingInstances(newProcess.getName(), slaRemovedClusters, EntityType.PROCESS.toString()); + } + } + } + + void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){ + for(String clusterName :slaRemovedClusters){ + MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName, + entityType); + } + } + + @Override + public void onReload(Entity entity) throws FalconException { + onAdd(entity); + } + + @Override + public String getName() { + return EntitySLAMonitoringService.class.getSimpleName(); + } + + @Override + public void init() throws FalconException { + String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri"); + storePath = new Path(uri); + filePath = new Path(storePath, "feedSLAMonitoringService"); + fileSystem = initializeFileSystem(); + + String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + statusCheckFrequencySeconds = Integer.parseInt(freq); + + freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); + lookAheadWindowMillis = Integer.parseInt(freq); + LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); + } + + public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) + throws FalconException { + LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, + clusterName, nominalTime); + List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName, + EntityType.FEED.toString())); + // Slas for feeds not having sla tag are not stored. + if (CollectionUtils.isEmpty(instances)){ + MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime, + EntityType.FEED.toString()); + } + } + + private FileSystem initializeFileSystem() { + try { + fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); + if (!fileSystem.exists(storePath)) { + LOG.info("Creating directory for pending feed instances: {}", storePath); + // set permissions so config store dir is owned by falcon alone + HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION); + } + return fileSystem; + } catch (Exception e) { + throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e); + } + } + + @Override + public void destroy() throws FalconException { + } + + //Periodically update status of pending instances, add new instances and take backup. + private class Monitor implements Runnable { + + @Override + public void run() { + try { + if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) { + checkPendingInstanceAvailability(EntityType.FEED.toString()); + checkPendingInstanceAvailability(EntityType.PROCESS.toString()); + + // add Instances from last checked time to 10 minutes from now(some buffer for status check) + Date now = new Date(); + Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); + addNewPendingFeedInstances(newCheckPoint, EntityType.FEED.toString()); + addNewPendingFeedInstances(newCheckPoint, EntityType.PROCESS.toString()); + } + } catch (Throwable e) { + LOG.error("Feed SLA monitoring failed: ", e); + } + } + } + + + void addNewPendingFeedInstances(Date to, String entityType) throws FalconException { + Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + List<MonitoredEntityBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); + for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) { + String entityName = monitoredEntityBean.getFeedName(); + Entity entity = EntityUtil.getEntity(entityType, entityName); + Set<String> clusters = EntityUtil.getClustersDefined(entity); + List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList(); + for(String string : clusters){ + cluster.add(ClusterHelper.getCluster(string)); + } + for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : cluster) { + if (currentClusters.contains(feedCluster.getName())) { + // get start of instances from the database + Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName, + EntityType.FEED.toString()); + Pair<String, String> key = new Pair<>(entity.getName(), feedCluster.getName()); + if (nextInstanceTime == null) { + nextInstanceTime = getInitialStartTime(entity, feedCluster.getName(), entityType); + } else { + nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); + } + + Set<Date> instances = new HashSet<>(); + org.apache.falcon.entity.v0.cluster.Cluster currentCluster = + EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); + nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); + Date endDate; + if (entityType.equals(EntityType.FEED.toString())){ + endDate = FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd(); + }else { + endDate = ProcessHelper.getClusterValidity((Process) entity, + currentCluster.getName()).getEnd(); + } + while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { + LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key); + instances.add(nextInstanceTime); + nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); + nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); + } + + for(Date date:instances){ + MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), feedCluster.getName(), date, + entityType); + } + } + } + } + } + + + /** + * Checks the availability of all the pendingInstances and removes the ones which have become available. + */ + private void checkPendingInstanceAvailability(String entityType) throws FalconException { + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), entityType)) { + boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), date, entityType); + if (status) { + MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), date, EntityType.FEED.toString()); + } + } + } + } + + // checks whether a given feed instance is available or not + private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime, + String entityType) throws + FalconException { + Entity entity = EntityUtil.getEntity(entityType, entityName); + + try { + if (entityType.equals(EntityType.PROCESS.toString())){ + LOG.debug("Checking instance availability status for entity:{}, cluster:{}, " + + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); + AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); + InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null); + if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){ + LOG.debug("Entity instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(), + clusterName, nominalTime); + return true; + } + return false; + } + if (entityType.equals(EntityType.FEED.toString())){ + LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", + entity.getName(), clusterName, nominalTime); + + FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus((Feed) entity, + clusterName, nominalTime); + if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE) + || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) { + LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(), + clusterName, nominalTime); + return true; + } + } + } catch (Throwable e) { + LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}", entityName, clusterName, + entityType, e); + } + LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(), + clusterName, nominalTime); + return false; + } + + + /** + * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed + * slaLow or slaHigh. + * + * Only feeds which have defined sla in their definition are considered. + * Only the feed instances between the given time range are considered. + * Start time and end time are both inclusive. + * @param start start time, inclusive + * @param end end time, inclusive + * @return Set of pending feed instances belonging to the given range which have missed SLA + * @throws FalconException + */ + public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end) + throws FalconException { + Set<SchedulableEntityInstance> result = new HashSet<>(); + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName()); + Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); + Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); + Sla sla = FeedHelper.getSLA(cluster, feed); + if (sla != null) { + Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, + MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), + pendingInstanceBean.getClusterName(), EntityType.FEED.toString())); + for (Pair<Date, String> status : slaStatus) { + SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, + feedClusterPair.second, status.first, EntityType.FEED); + instance.setTags(status.second); + result.add(instance); + } + } + } + return result; + } + + /** + * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range + * which missed sla.Only those instances are included which have missed either slaLow or slaHigh. + * @param entityName name of the feed + * @param clusterName cluster name + * @param start start time, inclusive + * @param end end time, inclusive + * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA. + * @throws FalconException + */ + public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName, + Date start, Date end, String entityType) throws FalconException { + + Set<SchedulableEntityInstance> result = new HashSet<>(); + List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName, + entityType); + if (missingInstances == null || !Arrays.asList(EntityType.FEED.toString(), + EntityType.PROCESS.toString()).contains(entityType)){ + return result; + } + Entity entity = EntityUtil.getEntity(entityType, entityName); + + if (entityType.equals(EntityType.FEED.toString())) { + Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity); + + if (sla != null) { + Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, missingInstances); + for (Pair<Date, String> status : slaStatus){ + SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName, + status.first, EntityType.FEED); + instance.setTags(status.second); + result.add(instance); + } + } + return result; + } else { + org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process) entity); + if (sla != null){ + Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end, missingInstances); + for (Pair<Date, String> status : slaStatus){ + SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName, + status.first, EntityType.PROCESS); + instance.setTags(status.second); + result.add(instance); + } + } + } + return result; + } + + Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances) + throws FalconException { + Date now = new Date(); + Frequency slaLow = sla.getSlaLow(); + Frequency slaHigh = sla.getSlaHigh(); + Set<Pair<Date, String>> result = new HashSet<>(); + for (Date nominalTime : missingInstances) { + if (!nominalTime.before(start) && !nominalTime.after(end)) { + ExpressionHelper.setReferenceDate(nominalTime); + ExpressionHelper evaluator = ExpressionHelper.get(); + Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class); + Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class); + Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration); + Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration); + if (slaCriticalTime.before(now)) { + result.add(new Pair<>(nominalTime, TAG_CRITICAL)); + } else if (slaWarnTime.before(now)) { + result.add(new Pair<>(nominalTime, TAG_WARN)); + } + } + } + return result; + } + + Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start, + Date end, List<Date> missingInstances) throws FalconException { + Date now = new Date(); + Frequency slaHigh = sla.getShouldEndIn(); + Set<Pair<Date, String>> result = new HashSet<>(); + for (Date nominalTime : missingInstances) { + if (!nominalTime.before(start) && !nominalTime.after(end)) { + ExpressionHelper.setReferenceDate(nominalTime); + ExpressionHelper evaluator = ExpressionHelper.get(); + Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class); + Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration); + if (slaCriticalTime.before(now)) { + result.add(new Pair<>(nominalTime, TAG_CRITICAL)); + } + } + } + return result; + } + + @VisibleForTesting + Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws FalconException { + if (entityType.equals(EntityType.PROCESS.toString())){ + Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity); + if (sla == null) { + throw new IllegalStateException("InitialStartTime can not be determined as the feed: " + + entity.getName() + " and cluster: " + clusterName + " does not have any sla"); + } + Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName); + Frequency slaLow = sla.getSlaLow(); + Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); + return startTime.before(slaTime) ? startTime : slaTime; + } else{ + org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity); + if (sla == null) { + throw new IllegalStateException("InitialStartTime can not be determined as the feed: " + + entity.getName() + " and cluster: " + clusterName + " does not have any sla"); + } + Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName); + Frequency slaLow = sla.getShouldEndIn(); + Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); + return startTime.before(slaTime) ? startTime : slaTime; + } + } + + public void makeProcessInstanceAvailable(String clusterName, String entityName, String date, String entityType) { + Date nominalTime = null; + try { + nominalTime = DateUtil.parseDateFalconTZ(date); + }catch (ParseException e){ + LOG.error("Exception while translating the date:", e); + } + if (nominalTime!= null){ + List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName, + entityType)); + if (!CollectionUtils.isEmpty(instances)){ + MONITORING_JDBC_STATE_STORE.deletePendingInstance(entityName, clusterName, nominalTime, + entityType); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java deleted file mode 100644 index c09c7ae..0000000 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.service; - - -import java.util.Date; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.ArrayList; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.jdbc.MonitoringJdbcStateStore; -import org.apache.falcon.persistence.PendingInstanceBean; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to know which all feeds have missed SLA. - */ -public final class FeedSLAAlertService implements FalconService, EntitySLAListener { - - private static final String NAME = "FeedSLAAlertService"; - - private static final Logger LOG = LoggerFactory.getLogger(FeedSLAAlertService.class); - - private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); - - private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>(); - - private static final FeedSLAAlertService SERVICE = new FeedSLAAlertService(); - - public static FeedSLAAlertService get() { - return SERVICE; - } - - private FeedSLAAlertService(){} - - - @Override - public String getName() { - return NAME; - } - - @Override - public void init() throws FalconException { - String listenerClassNames = StartupProperties.get(). - getProperty("feedAlert.listeners"); - for (String listenerClassName : listenerClassNames.split(",")) { - listenerClassName = listenerClassName.trim(); - if (listenerClassName.isEmpty()) { - continue; - } - EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); - registerListener(listener); - } - - String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); - int statusCheckFrequencySeconds = Integer.parseInt(freq); - - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS); - } - - public void registerListener(EntitySLAListener listener) { - listeners.add(listener); - } - - @Override - public void destroy() throws FalconException { - - } - - - private class Monitor implements Runnable { - - @Override - public void run() { - processSLACandidates(); - } - } - - void processSLACandidates(){ - //Get all feeds instances to be monitored - List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances(); - if (pendingInstanceBeanList.isEmpty()){ - return; - } - - LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); - try{ - for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { - - String feedName = pendingInstanceBean.getFeedName(); - String clusterName = pendingInstanceBean.getClusterName(); - Date nominalTime = pendingInstanceBean.getNominalTime(); - Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName); - - Cluster cluster = FeedHelper.getCluster(feed, clusterName); - - Set<SchedulableEntityInstance> schedulableEntityInstances= FeedSLAMonitoringService.get(). - getFeedSLAMissPendingAlerts(feed.getName(), cluster.getName(), nominalTime, nominalTime); - if (schedulableEntityInstances.isEmpty()){ - store.deleteFeedAlertInstance(feed.getName(), cluster.getName(), nominalTime); - return; - } - List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances); - SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); - - - if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_WARN)) { - store.putSLAAlertInstance(feedName, clusterName, nominalTime, true, false); - //Mark in DB as SLA missed - LOG.info("Feed :"+ feedName - + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow"); - } else if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_CRITICAL)){ - store.updateSLAAlertInstance(feedName, clusterName, nominalTime); - LOG.info("Feed :"+ feedName - + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLAHigh"); - highSLAMissed(feedName, EntityType.FEED, clusterName, nominalTime); - } - } - } catch (FalconException e){ - LOG.error("Exception in FeedSLAALertService:", e); - } - - } - - @Override - public void highSLAMissed(String feedName, EntityType entityType, String clusterName, Date nominalTime) - throws FalconException{ - for (EntitySLAListener listener : listeners) { - listener.highSLAMissed(feedName, entityType, clusterName, nominalTime); - } - store.deleteFeedAlertInstance(feedName, clusterName, nominalTime); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java deleted file mode 100644 index ed7bb08..0000000 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ /dev/null @@ -1,450 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.service; - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.Pair; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.FeedHelper; -import org.apache.falcon.entity.FeedInstanceStatus; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.jdbc.MonitoringJdbcStateStore; -import org.apache.falcon.persistence.MonitoredFeedsBean; -import org.apache.falcon.persistence.PendingInstanceBean; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.apache.falcon.util.DateUtil; -import org.apache.falcon.util.DeploymentUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Service to monitor Feed SLAs. - */ -public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService { - private static final Logger LOG = LoggerFactory.getLogger("FeedSLA"); - - private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); - - private static final int ONE_MS = 1; - - private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService(); - - public static final String TAG_CRITICAL = "Missed-SLA-High"; - public static final String TAG_WARN = "Missed-SLA-Low"; - - private FeedSLAMonitoringService() { - - } - - public static FeedSLAMonitoringService get() { - return SERVICE; - } - - /** - * Permissions for storePath. - */ - private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - - - /** - * Frequency in seconds of "status check" for pending feed instances. - */ - private int statusCheckFrequencySeconds; // 10 minutes - - - /** - * Time Duration (in milliseconds) in future for generating pending feed instances. - * - * In every cycle pending feed instances are added for monitoring, till this time in future. - */ - private int lookAheadWindowMillis; // 15 MINUTES - - - /** - * Filesystem used for serializing and deserializing. - */ - private FileSystem fileSystem; - - /** - * Working directory for the feed sla monitoring service. - */ - private Path storePath; - - /** - * Path to store the state of the monitoring service. - */ - private Path filePath; - - @Override - public void onAdd(Entity entity) throws FalconException { - if (entity.getEntityType() == EntityType.FEED) { - Feed feed = (Feed) entity; - // currently sla service is enabled only for fileSystemStorage - if (feed.getLocations() != null) { - Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - for (Cluster cluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName())) { - if (FeedHelper.getSLA(cluster, feed) != null) { - LOG.debug("Adding feed:{} for monitoring", feed.getName()); - MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName()); - } - } - } - } - } - } - - @Override - public void onRemove(Entity entity) throws FalconException { - if (entity.getEntityType() == EntityType.FEED) { - Feed feed = (Feed) entity; - // currently sla service is enabled only for fileSystemStorage - if (feed.getLocations() != null) { - Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - for (Cluster cluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { - MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName()); - MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName()); - } - } - } - } - } - - private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) { - if (feed.getLocations() != null) { - Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - for (Cluster cluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { - return true; - } - } - } - return false; - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - if (newEntity.getEntityType() == EntityType.FEED) { - Feed oldFeed = (Feed) oldEntity; - Feed newFeed = (Feed) newEntity; - if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) { - onRemove(oldFeed); - } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) { - onAdd(newFeed); - } else { - List<String> slaRemovedClusters = new ArrayList<>(); - for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) { - if (FeedHelper.getSLA(oldCluster, oldFeed) != null - && FeedHelper.getSLA(oldCluster, newFeed) == null) { - slaRemovedClusters.add(oldCluster); - } - } - - for (String clusterName : slaRemovedClusters) { - MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName); - } - } - } - } - - @Override - public void onReload(Entity entity) throws FalconException { - onAdd(entity); - } - - @Override - public String getName() { - return FeedSLAMonitoringService.class.getSimpleName(); - } - - @Override - public void init() throws FalconException { - String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri"); - storePath = new Path(uri); - filePath = new Path(storePath, "feedSLAMonitoringService"); - fileSystem = initializeFileSystem(); - - String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); - statusCheckFrequencySeconds = Integer.parseInt(freq); - - freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000"); - lookAheadWindowMillis = Integer.parseInt(freq); - LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString()); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); - } - - public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) - throws FalconException { - LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName, - clusterName, nominalTime); - List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName)); - // Slas for feeds not having sla tag are not stored. - if (CollectionUtils.isEmpty(instances)){ - MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime); - } - } - - private FileSystem initializeFileSystem() { - try { - fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); - if (!fileSystem.exists(storePath)) { - LOG.info("Creating directory for pending feed instances: {}", storePath); - // set permissions so config store dir is owned by falcon alone - HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION); - } - return fileSystem; - } catch (Exception e) { - throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e); - } - } - - @Override - public void destroy() throws FalconException { - } - - //Periodically update status of pending instances, add new instances and take backup. - private class Monitor implements Runnable { - - @Override - public void run() { - try { - if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) { - checkPendingInstanceAvailability(); - - // add Instances from last checked time to 10 minutes from now(some buffer for status check) - Date now = new Date(); - Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); - addNewPendingFeedInstances(newCheckPoint); - } - } catch (Throwable e) { - LOG.error("Feed SLA monitoring failed: ", e); - } - } - } - - - void addNewPendingFeedInstances(Date to) throws FalconException { - Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); - for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) { - String feedName = monitoredFeedsBean.getFeedName(); - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); - for (Cluster feedCluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(feedCluster.getName())) { - // get start of instances from the database - Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName); - Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName()); - if (nextInstanceTime == null) { - nextInstanceTime = getInitialStartTime(feed, feedCluster.getName()); - } else { - nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); - } - - Set<Date> instances = new HashSet<>(); - org.apache.falcon.entity.v0.cluster.Cluster currentCluster = - EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); - nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); - Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd(); - while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { - LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key); - instances.add(nextInstanceTime); - nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); - nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime); - } - - for(Date date:instances){ - MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date); - } - } - } - } - } - - - /** - * Checks the availability of all the pendingInstances and removes the ones which have become available. - */ - private void checkPendingInstanceAvailability() throws FalconException { - for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ - for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName())) { - boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName(), date); - if (status) { - MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName(), date); - } - } - } - } - - // checks whether a given feed instance is available or not - private boolean checkFeedInstanceAvailability(String feedName, String clusterName, Date nominalTime) throws - FalconException { - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); - - try { - LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", feed.getName(), - clusterName, nominalTime); - FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus(feed, clusterName, - nominalTime); - if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE) - || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) { - LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", feed.getName(), - clusterName, nominalTime); - return true; - } - } catch (Throwable e) { - LOG.error("Couldn't find status for feed:{}, cluster:{}", feedName, clusterName, e); - } - LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", feed.getName(), - clusterName, nominalTime); - return false; - } - - - /** - * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed - * slaLow or slaHigh. - * - * Only feeds which have defined sla in their definition are considered. - * Only the feed instances between the given time range are considered. - * Start time and end time are both inclusive. - * @param start start time, inclusive - * @param end end time, inclusive - * @return Set of pending feed instances belonging to the given range which have missed SLA - * @throws FalconException - */ - public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end) - throws FalconException { - Set<SchedulableEntityInstance> result = new HashSet<>(); - for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ - Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName()); - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); - Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); - Sla sla = FeedHelper.getSLA(cluster, feed); - if (sla != null) { - Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, - MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(), - pendingInstanceBean.getClusterName())); - for (Pair<Date, String> status : slaStatus) { - SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first, - feedClusterPair.second, status.first, EntityType.FEED); - instance.setTags(status.second); - result.add(instance); - } - } - } - return result; - } - - /** - * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range - * which missed sla.Only those instances are included which have missed either slaLow or slaHigh. - * @param feedName name of the feed - * @param clusterName cluster name - * @param start start time, inclusive - * @param end end time, inclusive - * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA. - * @throws FalconException - */ - public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName, - Date start, Date end) throws FalconException { - - Set<SchedulableEntityInstance> result = new HashSet<>(); - Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName); - List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName); - Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName); - Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second); - Sla sla = FeedHelper.getSLA(cluster, feed); - if (missingInstances != null && sla != null) { - Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, missingInstances); - for (Pair<Date, String> status : slaStatus){ - SchedulableEntityInstance instance = new SchedulableEntityInstance(feedName, clusterName, status.first, - EntityType.FEED); - instance.setTags(status.second); - result.add(instance); - } - } - return result; - } - - Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances) - throws FalconException { - Date now = new Date(); - Frequency slaLow = sla.getSlaLow(); - Frequency slaHigh = sla.getSlaHigh(); - Set<Pair<Date, String>> result = new HashSet<>(); - for (Date nominalTime : missingInstances) { - if (!nominalTime.before(start) && !nominalTime.after(end)) { - ExpressionHelper.setReferenceDate(nominalTime); - ExpressionHelper evaluator = ExpressionHelper.get(); - Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class); - Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class); - Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration); - Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration); - if (slaCriticalTime.before(now)) { - result.add(new Pair<>(nominalTime, TAG_CRITICAL)); - } else if (slaWarnTime.before(now)) { - result.add(new Pair<>(nominalTime, TAG_WARN)); - } - } - } - return result; - } - - @VisibleForTesting - Date getInitialStartTime(Feed feed, String clusterName) throws FalconException { - Sla sla = FeedHelper.getSLA(clusterName, feed); - if (sla == null) { - throw new IllegalStateException("InitialStartTime can not be determined as the feed: " - + feed.getName() + " and cluster: " + clusterName + " does not have any sla"); - } - Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName); - Frequency slaLow = sla.getSlaLow(); - Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); - return startTime.before(slaTime) ? startTime : slaTime; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index a4a95be..8cf2b2d 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -25,6 +25,7 @@ import javax.persistence.Query; import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.tools.FalconStateStoreDBCLI; @@ -85,40 +86,44 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { @Test public void testInsertRetrieveAndUpdate() throws Exception { - monitoringJdbcStateStore.putMonitoredFeed("test_feed1"); - monitoringJdbcStateStore.putMonitoredFeed("test_feed2"); - Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName()); + monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString()); + monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString()); + Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1", + EntityType.FEED.toString()).getFeedName()); Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2); - monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1"); - monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2"); + monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString()); + monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString()); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); - monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne); - monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo); - - Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2); - monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne); - Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1); - monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster"); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne, EntityType.FEED.toString()); + monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo, EntityType.FEED.toString()); + + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster", + EntityType.FEED.toString()).size(), 2); + monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne, + EntityType.FEED.toString()); + Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster", + EntityType.FEED.toString()).size(), 1); + monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster", EntityType.FEED.toString()); } @Test public void testEmptyLatestInstance() throws Exception { MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); - store.putMonitoredFeed("test-feed1"); - store.putMonitoredFeed("test-feed2"); - Assert.assertNull(store.getLastInstanceTime("test-feed1")); + store.putMonitoredEntity("test-feed1", EntityType.FEED.toString()); + store.putMonitoredEntity("test-feed2", EntityType.FEED.toString()); + Assert.assertNull(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString())); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z"); - store.putPendingInstances("test-feed1", "test_cluster", dateTwo); - store.putPendingInstances("test-feed1", "test_cluster", dateOne); - store.putPendingInstances("test-feed2", "test_cluster", dateOne); + store.putPendingInstances("test-feed1", "test_cluster", dateTwo, EntityType.FEED.toString()); + store.putPendingInstances("test-feed1", "test_cluster", dateOne, EntityType.FEED.toString()); + store.putPendingInstances("test-feed2", "test_cluster", dateOne, EntityType.FEED.toString()); - Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1"))); - Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2"))); + Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString()))); + Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2", EntityType.FEED.toString()))); } @@ -126,14 +131,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { public void testputSLALowCandidate() throws Exception{ MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE); - Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", - "test-cluster", dateOne).getIsSLALowMissed()); - Assert.assertTrue(dateOne.equals(store.getFeedAlertInstance("test-feed1", - "test-cluster", dateOne).getNominalTime())); - store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne); - Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", - "test-cluster", dateOne).getIsSLAHighMissed()); + store.putSLAAlertInstance("test-feed1", "test-cluster", EntityType.FEED.toString(), + dateOne, Boolean.TRUE, Boolean.FALSE); + Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1", + "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLALowMissed()); + Assert.assertTrue(dateOne.equals(store.getEntityAlertInstance("test-feed1", + "test-cluster", dateOne, EntityType.FEED.toString()).getNominalTime())); + store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne, EntityType.FEED.toString()); + Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1", + "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLAHighMissed()); } @Test @@ -141,21 +147,22 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); - store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE); - store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne); - Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", - "test-cluster", dateOne).getIsSLAHighMissed()); + store.putSLAAlertInstance("test-process", "test-cluster", EntityType.PROCESS.toString(), + dateOne, Boolean.TRUE, Boolean.FALSE); + store.updateSLAAlertInstance("test-process", "test-cluster", dateOne, EntityType.PROCESS.toString()); + Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-process", + "test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed()); } private void clear() { EntityManager em = FalconJPAService.get().getEntityManager(); em.getTransaction().begin(); try { - Query query = em.createNativeQuery("delete from MONITORED_FEEDS"); + Query query = em.createNativeQuery("delete from PENDING_INSTANCES"); query.executeUpdate(); - query = em.createNativeQuery("delete from PENDING_INSTANCES"); + query = em.createNativeQuery("delete from MONITORED_ENTITY"); query.executeUpdate(); - query = em.createNativeQuery("delete from FEED_SLA_ALERTS"); + query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS"); query.executeUpdate(); } finally { http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java new file mode 100644 index 0000000..c8b4f5e --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.io.File; +import java.util.Date; + +/** + * Test for EntitySLAMonitoringService. + */ +public class EntitySLAAlertServiceTest extends AbstractTestBase { + private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + private static MonitoringJdbcStateStore monitoringJdbcStateStore; + private static FalconJPAService falconJPAService = FalconJPAService.get(); + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } + + public void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + @BeforeClass + public void setup() throws Exception{ + StateStoreProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + monitoringJdbcStateStore = new MonitoringJdbcStateStore(); + } + + @BeforeMethod + public void init() { + clear(); + } + + private void clear() { + EntityManager em = FalconJPAService.get().getEntityManager(); + em.getTransaction().begin(); + try { + Query query = em.createNativeQuery("delete from PENDING_INSTANCES"); + query.executeUpdate(); + query = em.createNativeQuery("delete from MONITORED_ENTITY"); + query.executeUpdate(); + query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS"); + query.executeUpdate(); + + } finally { + em.getTransaction().commit(); + em.close(); + } + } + + @Test + public static void processSLALowCandidates() throws FalconException, InterruptedException{ + + Date dateOne = new Date(System.currentTimeMillis()-100000); + monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString()); + org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); + org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster(); + cluster1.setName("test-cluster"); + Cluster testCluster = new Cluster(); + testCluster.setName("test-cluster"); + cluster.getClusters().add(testCluster); + Feed mockEntity = new Feed(); + mockEntity.setName("test-feed"); + mockEntity.setClusters(cluster); + cluster1.setColo("test-cluster"); + + + if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { + ConfigurationStore.get().publish(EntityType.FEED, mockEntity); + } + if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) { + ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1); + } + Sla sla = new Sla(); + Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); + Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); + sla.setSlaLow(frequencyLow); + sla.setSlaHigh(frequencyHigh); + mockEntity.setSla(sla); + + EntitySLAAlertService.get().init(); + Thread.sleep(10*1000); + Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster", + dateOne, EntityType.FEED.toString()).getIsSLALowMissed()); + } + + @Test + public static void processSLACandidateProcess() throws FalconException, InterruptedException{ + Date dateOne = new Date(System.currentTimeMillis()-130000); + + monitoringJdbcStateStore.putPendingInstances("test-process", "test-cluster", dateOne, + EntityType.PROCESS.name()); + EntitySLAAlertService.get().init(); + org.apache.falcon.entity.v0.process.Clusters cluster = new org.apache.falcon.entity.v0.process.Clusters(); + org.apache.falcon.entity.v0.cluster.Cluster processCluster = new org.apache.falcon.entity.v0.cluster.Cluster(); + processCluster.setName("test-cluster"); + org.apache.falcon.entity.v0.process.Cluster testCluster = new org.apache.falcon.entity.v0.process.Cluster(); + testCluster.setName("test-cluster"); + cluster.getClusters().add(testCluster); + Process process = new Process(); + process.setName("test-process"); + process.setClusters(cluster); + processCluster.setColo("test-cluster"); + + if (ConfigurationStore.get().get(EntityType.PROCESS, process.getName()) == null){ + ConfigurationStore.get().publish(EntityType.PROCESS, process); + } + if (ConfigurationStore.get().get(EntityType.CLUSTER, processCluster.getName()) == null) { + ConfigurationStore.get().publish(EntityType.CLUSTER, processCluster); + } + org.apache.falcon.entity.v0.process.Sla sla = new org.apache.falcon.entity.v0.process.Sla(); + Frequency processFrequency = new Frequency("1", Frequency.TimeUnit.minutes); + sla.setShouldEndIn(processFrequency); + process.setSla(sla); + + + Thread.sleep(10*1000); + Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-process", "test-cluster", dateOne, + EntityType.PROCESS.name()).getIsSLAHighMissed()); + + } + + @Test(expectedExceptions = javax.persistence.NoResultException.class) + public static void processSLAHighCandidates() throws FalconException, InterruptedException{ + + Date dateOne = new Date(System.currentTimeMillis()-130000); + monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString()); + org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); + org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster(); + cluster1.setName("test-cluster"); + Cluster testCluster = new Cluster(); + testCluster.setName("test-cluster"); + cluster.getClusters().add(testCluster); + Feed mockEntity = new Feed(); + mockEntity.setName("test-feed"); + mockEntity.setClusters(cluster); + cluster1.setColo("test-cluster"); + if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { + ConfigurationStore.get().publish(EntityType.FEED, mockEntity); + } + if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) { + ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1); + } + Sla sla = new Sla(); + Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); + Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); + sla.setSlaLow(frequencyLow); + sla.setSlaHigh(frequencyHigh); + mockEntity.setSla(sla); + + EntitySLAAlertService.get().init(); + Thread.sleep(10*1000); + Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster", + dateOne, EntityType.FEED.toString()).getIsSLAHighMissed()); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java deleted file mode 100644 index 7c886c1..0000000 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.service; - -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Sla; -import org.apache.falcon.jdbc.MonitoringJdbcStateStore; -import org.apache.falcon.tools.FalconStateStoreDBCLI; -import org.apache.falcon.util.StateStoreProperties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import javax.persistence.EntityManager; -import javax.persistence.Query; -import java.io.File; -import java.util.Date; - -/** - * Test for FeedSLAMonitoringService. - */ -public class FeedSLAAlertServiceTest extends AbstractTestBase { - private static final String DB_BASE_DIR = "target/test-data/persistancedb"; - protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; - protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; - protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; - protected LocalFileSystem fs = new LocalFileSystem(); - - private static MonitoringJdbcStateStore monitoringJdbcStateStore; - private static FalconJPAService falconJPAService = FalconJPAService.get(); - - protected int execDBCLICommands(String[] args) { - return new FalconStateStoreDBCLI().run(args); - } - - public void createDB(String file) { - File sqlFile = new File(file); - String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; - int result = execDBCLICommands(argsCreate); - Assert.assertEquals(0, result); - Assert.assertTrue(sqlFile.exists()); - - } - - @BeforeClass - public void setup() throws Exception{ - StateStoreProperties.get().setProperty(FalconJPAService.URL, url); - Configuration localConf = new Configuration(); - fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); - fs.mkdirs(new Path(DB_BASE_DIR)); - createDB(DB_SQL_FILE); - falconJPAService.init(); - this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); - this.conf = dfsCluster.getConf(); - monitoringJdbcStateStore = new MonitoringJdbcStateStore(); - } - - @BeforeMethod - public void init() { - clear(); - } - - private void clear() { - EntityManager em = FalconJPAService.get().getEntityManager(); - em.getTransaction().begin(); - try { - Query query = em.createNativeQuery("delete from MONITORED_FEEDS"); - query.executeUpdate(); - query = em.createNativeQuery("delete from PENDING_INSTANCES"); - query.executeUpdate(); - query = em.createNativeQuery("delete from FEED_SLA_ALERTS"); - query.executeUpdate(); - - } finally { - em.getTransaction().commit(); - em.close(); - } - } - - @Test - public static void processSLALowCandidates() throws FalconException, InterruptedException{ - - Date dateOne = new Date(System.currentTimeMillis()-100000); - monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne); - org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); - Cluster testCluster = new Cluster(); - testCluster.setName("test-cluster"); - cluster.getClusters().add(testCluster); - Feed mockEntity = new Feed(); - mockEntity.setName("test-feed"); - mockEntity.setClusters(cluster); - if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { - ConfigurationStore.get().publish(EntityType.FEED, mockEntity); - } - Sla sla = new Sla(); - Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); - Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); - sla.setSlaLow(frequencyLow); - sla.setSlaHigh(frequencyHigh); - mockEntity.setSla(sla); - - FeedSLAAlertService.get().init(); - Thread.sleep(10*1000); - Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster", - dateOne).getIsSLALowMissed()); - } - - @Test(expectedExceptions = javax.persistence.NoResultException.class) - public static void processSLAHighCandidates() throws FalconException, InterruptedException{ - - Date dateOne = new Date(System.currentTimeMillis()-130000); - monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne); - org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); - Cluster testCluster = new Cluster(); - testCluster.setName("test-cluster"); - cluster.getClusters().add(testCluster); - Feed mockEntity = new Feed(); - mockEntity.setName("test-feed"); - mockEntity.setClusters(cluster); - if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { - ConfigurationStore.get().publish(EntityType.FEED, mockEntity); - } - Sla sla = new Sla(); - Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); - Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); - sla.setSlaLow(frequencyLow); - sla.setSlaHigh(frequencyHigh); - mockEntity.setSla(sla); - - FeedSLAAlertService.get().init(); - Thread.sleep(10*1000); - Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster", - dateOne).getIsSLAHighMissed()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index 97cc459..9cf50c2 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -54,7 +54,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { private static final String CLUSTER_NAME = "testCluster"; private static final String FEED_NAME = "testFeed"; private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - private static final String TAG_CRITICAL = FeedSLAMonitoringService.get().TAG_CRITICAL; + private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL; @Test public void testSLAStatus() throws FalconException { @@ -74,7 +74,8 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time - Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances); + Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end, + missingInstances); Set<Pair<Date, String>> expected = new HashSet<>(); expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL)); expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));
