Repository: falcon Updated Branches: refs/heads/master e46e5c4c6 -> f14eca88e
FALCON-2074 Bugs in Process SLA monitoring after dev testing Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #221 from PraveenAdlakha/processSLATesting Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f14eca88 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f14eca88 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f14eca88 Branch: refs/heads/master Commit: f14eca88e7875e5c8cc88a31c0e9ae58928f29a1 Parents: e46e5c4 Author: Praveen Adlakha <[email protected]> Authored: Wed Jul 13 14:47:27 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Jul 13 14:47:27 2016 +0530 ---------------------------------------------------------------------- .../falcon/persistence/MonitoredEntityBean.java | 4 +- .../persistence/PersistenceConstants.java | 3 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 15 +++++-- .../falcon/service/EntitySLAAlertService.java | 19 +++++---- .../service/EntitySLAMonitoringService.java | 42 ++++++++++++-------- .../jdbc/MonitoringJdbcStateStoreTest.java | 2 +- 6 files changed, 54 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java index 20ce537..1db3d04 100644 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java @@ -42,7 +42,9 @@ import javax.validation.constraints.NotNull; + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean " + "a where a.entityName = :entityName and a.entityType = :entityType"), - @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) " + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query = "select OBJECT(a) " + + "from MonitoredEntityBean a where a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, query = "select OBJECT(a) " + "from MonitoredEntityBean a") }) @Table(name="MONITORED_ENTITY") http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index f9aa1f5..7c2479d 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -26,7 +26,7 @@ public final class PersistenceConstants { } public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE"; public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; - public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; + public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = "GET_ALL_MONITORING_ENTITY_FOR_TYPE"; public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE"; public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; @@ -61,4 +61,5 @@ public final class PersistenceConstants { public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE"; + public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index c1f818a..6a38b0a 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -87,9 +87,18 @@ public class MonitoringJdbcStateStore { } } - public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException { + public List<MonitoredEntityBean> getAllMonitoredEntity() throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY); + List result = q.getResultList(); + entityManager.close(); + return result; + } + + public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType) throws ResultNotFoundException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); List result = q.getResultList(); entityManager.close(); return result; @@ -159,7 +168,7 @@ public class MonitoringJdbcStateStore { return result; } - public List<PendingInstanceBean> getAllInstances(){ + public List<PendingInstanceBean> getAllPendingInstances(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); List result = q.getResultList(); http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index f023c35..57e46b7 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -69,13 +69,15 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList public void init() throws FalconException { String listenerClassNames = StartupProperties.get(). getProperty("feedAlert.listeners"); - for (String listenerClassName : listenerClassNames.split(",")) { - listenerClassName = listenerClassName.trim(); - if (listenerClassName.isEmpty()) { - continue; + if (listenerClassNames != null && !listenerClassNames.isEmpty()) { + for (String listenerClassName : listenerClassNames.split(",")) { + listenerClassName = listenerClassName.trim(); + if (listenerClassName.isEmpty()) { + continue; + } + EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); + registerListener(listener); } - EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); - registerListener(listener); } String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); @@ -105,7 +107,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList void processSLACandidates(){ //Get all feeds instances to be monitored - List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances(); + List<PendingInstanceBean> pendingInstanceBeanList = store.getAllPendingInstances(); if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ return; } @@ -152,7 +154,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList } } } catch (FalconException e){ - LOG.error("Exception in FeedSLAALertService:", e); + LOG.error("Exception in EntitySLAALertService:", e); } } @@ -160,6 +162,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList @Override public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime ) throws FalconException { + LOG.debug("Listners called..."); for (EntitySLAListener listener : listeners) { listener.highSLAMissed(entityName, clusterName, entityType, nominalTime); store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType); http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index f931625..6616f8b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -344,7 +344,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @Override public void run() { try { - if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) { + if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size() > 0) { checkPendingInstanceAvailability(EntityType.FEED.toString()); checkPendingInstanceAvailability(EntityType.PROCESS.toString()); @@ -363,31 +363,34 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList void addNewPendingFeedInstances(Date to, String entityType) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); - List<MonitoredEntityBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed(); - for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) { + List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE. + getAllMonitoredEntityForEntity(entityType); + for(MonitoredEntityBean monitoredEntityBean : entityBeanList) { String entityName = monitoredEntityBean.getFeedName(); Entity entity = EntityUtil.getEntity(entityType, entityName); + LOG.debug("entityName:"+ entityName+"entity:"+entity); Set<String> clusters = EntityUtil.getClustersDefined(entity); List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList(); for(String string : clusters){ cluster.add(ClusterHelper.getCluster(string)); } - for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : cluster) { - if (currentClusters.contains(feedCluster.getName())) { + for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : cluster) { + if (currentClusters.contains(entityCluster.getName())) { // get start of instances from the database Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName, - EntityType.FEED.toString()); - Pair<String, String> key = new Pair<>(entity.getName(), feedCluster.getName()); + entityType); + Pair<String, String> key = new Pair<>(entity.getName(), entityCluster.getName()); if (nextInstanceTime == null) { - nextInstanceTime = getInitialStartTime(entity, feedCluster.getName(), entityType); + nextInstanceTime = getInitialStartTime(entity, entityCluster.getName(), entityType); } else { nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); } Set<Date> instances = new HashSet<>(); org.apache.falcon.entity.v0.cluster.Cluster currentCluster = - EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName()); + EntityUtil.getEntity(EntityType.CLUSTER, entityCluster.getName()); nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); + LOG.info("nextInstanceTime:"+ nextInstanceTime + "entityName:"+entityName); Date endDate; if (entityType.equals(EntityType.FEED.toString())){ endDate = FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd(); @@ -396,14 +399,14 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList currentCluster.getName()).getEnd(); } while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { - LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key); + LOG.info("Adding instance={} for <entity,cluster>={}", nextInstanceTime, key); instances.add(nextInstanceTime); nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); } for(Date date:instances){ - MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), feedCluster.getName(), date, + MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), entityCluster.getName(), date, entityType); } } @@ -416,7 +419,12 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList * Checks the availability of all the pendingInstances and removes the ones which have become available. */ private void checkPendingInstanceAvailability(String entityType) throws FalconException { - for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + LOG.debug("Size "+MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size()); + if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){ + LOG.info("Returning as size of pending instance is zero"); + return; + } + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(), entityType)) { boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), @@ -462,10 +470,10 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } } } catch (Throwable e) { - LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}", entityName, clusterName, - entityType, e); + LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}, nominalTime{}", entityName, + clusterName, entityType, nominalTime, e); } - LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(), + LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(), clusterName, nominalTime); return false; } @@ -486,7 +494,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end) throws FalconException { Set<SchedulableEntityInstance> result = new HashSet<>(); - for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){ + for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName()); Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first); @@ -602,7 +610,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @VisibleForTesting Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws FalconException { - if (entityType.equals(EntityType.PROCESS.toString())){ + if (entityType.equals(EntityType.FEED.toString())){ Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity); if (sla == null) { throw new IllegalStateException("InitialStartTime can not be determined as the feed: " http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 8cf2b2d..018c562 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -90,7 +90,7 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString()); Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1", EntityType.FEED.toString()).getFeedName()); - Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2); + Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntity().size(), 2); monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString()); monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString());
