Repository: falcon
Updated Branches:
  refs/heads/master e46e5c4c6 -> f14eca88e


FALCON-2074 Bugs in Process SLA monitoring after dev testing

Author: Praveen Adlakha <[email protected]>

Reviewers: @pallavi-rao

Closes #221 from PraveenAdlakha/processSLATesting


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f14eca88
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f14eca88
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f14eca88

Branch: refs/heads/master
Commit: f14eca88e7875e5c8cc88a31c0e9ae58928f29a1
Parents: e46e5c4
Author: Praveen Adlakha <[email protected]>
Authored: Wed Jul 13 14:47:27 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Wed Jul 13 14:47:27 2016 +0530

----------------------------------------------------------------------
 .../falcon/persistence/MonitoredEntityBean.java |  4 +-
 .../persistence/PersistenceConstants.java       |  3 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 15 +++++--
 .../falcon/service/EntitySLAAlertService.java   | 19 +++++----
 .../service/EntitySLAMonitoringService.java     | 42 ++++++++++++--------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  2 +-
 6 files changed, 54 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java 
b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
index 20ce537..1db3d04 100644
--- 
a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
+++ 
b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -42,7 +42,9 @@ import javax.validation.constraints.NotNull;
                 + "MonitoredEntityBean a where a.entityName = :entityName and 
a.entityType = :entityType"),
         @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, 
query = "delete from MonitoredEntityBean "
                 + "a where a.entityName = :entityName and a.entityType = 
:entityType"),
-        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, 
query = "select OBJECT(a) "
+        @NamedQuery(name = 
PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query = "select 
OBJECT(a) "
+                + "from MonitoredEntityBean a where a.entityType = 
:entityType"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, 
query = "select OBJECT(a) "
                 + "from MonitoredEntityBean a")
 })
 @Table(name="MONITORED_ENTITY")

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java 
b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index f9aa1f5..7c2479d 100644
--- 
a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ 
b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -26,7 +26,7 @@ public final class PersistenceConstants {
     }
     public static final String GET_MONITERED_INSTANCE = 
"GET_MONITERED_INSTANCE";
     public static final String DELETE_MONITORED_INSTANCES = 
"DELETE_MONITORED_INSTANCES";
-    public static final String GET_ALL_MONITORING_FEEDS = 
"GET_ALL_MONITORING_FEEDS";
+    public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = 
"GET_ALL_MONITORING_ENTITY_FOR_TYPE";
     public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
     public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
     public static final String DELETE_PENDING_NOMINAL_INSTANCES = 
"DELETE_PENDING_NOMINAL_INSTANCES";
@@ -61,4 +61,5 @@ public final class PersistenceConstants {
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
     public static final String GET_ENTITY_ALERT_INSTANCE = 
"GET_ENTITY_ALERT_INSTANCE";
     public static final String DELETE_ENTITY_ALERT_INSTANCE = 
"DELETE_ENTITY_ALERT_INSTANCE";
+    public static final String GET_ALL_MONITORING_ENTITY = 
"GET_ALL_MONITORING_ENTITY";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java 
b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index c1f818a..6a38b0a 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -87,9 +87,18 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public List<MonitoredEntityBean> getAllMonitoredFeed() throws 
ResultNotFoundException {
+    public List<MonitoredEntityBean> getAllMonitoredEntity() throws 
ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
-        Query q = 
entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+        Query q = 
entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY);
+        List result = q.getResultList();
+        entityManager.close();
+        return result;
+    }
+
+    public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String 
entityType) throws ResultNotFoundException {
+        EntityManager entityManager = getEntityManager();
+        Query q = 
entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -159,7 +168,7 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
-    public List<PendingInstanceBean> getAllInstances(){
+    public List<PendingInstanceBean> getAllPendingInstances(){
         EntityManager entityManager = getEntityManager();
         Query q = 
entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
         List result = q.getResultList();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index f023c35..57e46b7 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -69,13 +69,15 @@ public final class EntitySLAAlertService implements 
FalconService, EntitySLAList
     public void init() throws FalconException {
         String listenerClassNames = StartupProperties.get().
                 getProperty("feedAlert.listeners");
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
+        if (listenerClassNames != null && !listenerClassNames.isEmpty()) {
+            for (String listenerClassName : listenerClassNames.split(",")) {
+                listenerClassName = listenerClassName.trim();
+                if (listenerClassName.isEmpty()) {
+                    continue;
+                }
+                EntitySLAListener listener = 
ReflectionUtils.getInstanceByClassName(listenerClassName);
+                registerListener(listener);
             }
-            EntitySLAListener listener = 
ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
         }
 
         String freq = 
StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", 
"600");
@@ -105,7 +107,7 @@ public final class EntitySLAAlertService implements 
FalconService, EntitySLAList
 
     void processSLACandidates(){
         //Get all feeds instances to be monitored
-        List<PendingInstanceBean> pendingInstanceBeanList = 
store.getAllInstances();
+        List<PendingInstanceBean> pendingInstanceBeanList = 
store.getAllPendingInstances();
         if (pendingInstanceBeanList == null || 
pendingInstanceBeanList.isEmpty()){
             return;
         }
@@ -152,7 +154,7 @@ public final class EntitySLAAlertService implements 
FalconService, EntitySLAList
                 }
             }
         } catch (FalconException e){
-            LOG.error("Exception in FeedSLAALertService:", e);
+            LOG.error("Exception in EntitySLAALertService:", e);
         }
 
     }
@@ -160,6 +162,7 @@ public final class EntitySLAAlertService implements 
FalconService, EntitySLAList
     @Override
     public void highSLAMissed(String entityName, String clusterName, String 
entityType , Date nominalTime
                               ) throws FalconException {
+        LOG.debug("Listners called...");
         for (EntitySLAListener listener : listeners) {
             listener.highSLAMissed(entityName, clusterName, entityType, 
nominalTime);
             store.deleteEntityAlertInstance(entityName, clusterName, 
nominalTime, entityType);

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index f931625..6616f8b 100644
--- 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -344,7 +344,7 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
         @Override
         public void run() {
             try {
-                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 
0) {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size() 
> 0) {
                     
checkPendingInstanceAvailability(EntityType.FEED.toString());
                     
checkPendingInstanceAvailability(EntityType.PROCESS.toString());
 
@@ -363,31 +363,34 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
 
     void addNewPendingFeedInstances(Date to, String entityType) throws 
FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        List<MonitoredEntityBean> feedsBeanList = 
MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
-        for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) {
+        List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.
+                getAllMonitoredEntityForEntity(entityType);
+        for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
             String entityName = monitoredEntityBean.getFeedName();
             Entity entity = EntityUtil.getEntity(entityType, entityName);
+            LOG.debug("entityName:"+ entityName+"entity:"+entity);
             Set<String> clusters =  EntityUtil.getClustersDefined(entity);
             List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new 
ArrayList();
             for(String string : clusters){
                 cluster.add(ClusterHelper.getCluster(string));
             }
-            for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : 
cluster) {
-                if (currentClusters.contains(feedCluster.getName())) {
+            for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : 
cluster) {
+                if (currentClusters.contains(entityCluster.getName())) {
                     // get start of instances from the database
                     Date nextInstanceTime = 
MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
-                            EntityType.FEED.toString());
-                    Pair<String, String> key = new Pair<>(entity.getName(), 
feedCluster.getName());
+                            entityType);
+                    Pair<String, String> key = new Pair<>(entity.getName(), 
entityCluster.getName());
                     if (nextInstanceTime == null) {
-                        nextInstanceTime = getInitialStartTime(entity, 
feedCluster.getName(), entityType);
+                        nextInstanceTime = getInitialStartTime(entity, 
entityCluster.getName(), entityType);
                     } else {
                         nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
                     }
 
                     Set<Date> instances = new HashSet<>();
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster 
=
-                            EntityUtil.getEntity(EntityType.CLUSTER, 
feedCluster.getName());
+                            EntityUtil.getEntity(EntityType.CLUSTER, 
entityCluster.getName());
                     nextInstanceTime = EntityUtil.getNextStartTime(entity, 
currentCluster, nextInstanceTime);
+                    LOG.info("nextInstanceTime:"+ nextInstanceTime + 
"entityName:"+entityName);
                     Date endDate;
                     if (entityType.equals(EntityType.FEED.toString())){
                         endDate =  FeedHelper.getClusterValidity((Feed) 
entity, currentCluster.getName()).getEnd();
@@ -396,14 +399,14 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
                                 currentCluster.getName()).getEnd();
                     }
                     while (nextInstanceTime.before(to) && 
nextInstanceTime.before(endDate)) {
-                        LOG.debug("Adding instance={} for <feed,cluster>={}", 
nextInstanceTime, key);
+                        LOG.info("Adding instance={} for <entity,cluster>={}", 
nextInstanceTime, key);
                         instances.add(nextInstanceTime);
                         nextInstanceTime = new Date(nextInstanceTime.getTime() 
+ ONE_MS);
                         nextInstanceTime = EntityUtil.getNextStartTime(entity, 
currentCluster, nextInstanceTime);
                     }
 
                     for(Date date:instances){
-                        
MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), 
feedCluster.getName(), date,
+                        
MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), 
entityCluster.getName(), date,
                                 entityType);
                     }
                 }
@@ -416,7 +419,12 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
      * Checks the availability of all the pendingInstances and removes the 
ones which have become available.
      */
     private void checkPendingInstanceAvailability(String entityType) throws 
FalconException {
-        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
+        LOG.debug("Size 
"+MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size());
+        if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){
+            LOG.info("Returning as size of pending instance is zero");
+            return;
+        }
+        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
             for (Date date : 
MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
                     pendingInstanceBean.getClusterName(), entityType)) {
                 boolean status = 
checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
@@ -462,10 +470,10 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
                 }
             }
         } catch (Throwable e) {
-            LOG.error("Couldn't find status for Entity:{}, cluster:{}, 
entityType{}", entityName, clusterName,
-                    entityType, e);
+            LOG.error("Couldn't find status for Entity:{}, cluster:{}, 
entityType{}, nominalTime{}", entityName,
+                    clusterName, entityType, nominalTime, e);
         }
-        LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not 
available.", entity.getName(),
+        LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is 
not available.", entity.getName(),
             clusterName, nominalTime);
         return false;
     }
@@ -486,7 +494,7 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
     public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date 
start, Date end)
         throws FalconException {
         Set<SchedulableEntityInstance> result = new HashSet<>();
-        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllInstances()){
+        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
             Pair<String, String> feedClusterPair = new 
Pair<>(pendingInstanceBean.getEntityName(),
                     pendingInstanceBean.getClusterName());
             Feed feed = EntityUtil.getEntity(EntityType.FEED, 
feedClusterPair.first);
@@ -602,7 +610,7 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
 
     @VisibleForTesting
     Date getInitialStartTime(Entity entity, String clusterName, String 
entityType) throws FalconException {
-        if (entityType.equals(EntityType.PROCESS.toString())){
+        if (entityType.equals(EntityType.FEED.toString())){
             Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
             if (sla == null) {
                 throw new IllegalStateException("InitialStartTime can not be 
determined as the feed: "

http://git-wip-us.apache.org/repos/asf/falcon/blob/f14eca88/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java 
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index 8cf2b2d..018c562 100644
--- 
a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ 
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -90,7 +90,7 @@ public class MonitoringJdbcStateStoreTest extends 
AbstractTestBase {
         monitoringJdbcStateStore.putMonitoredEntity("test_feed2", 
EntityType.FEED.toString());
         Assert.assertEquals("test_feed1", 
monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
                 EntityType.FEED.toString()).getFeedName());
-        
Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
+        
Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntity().size(), 2);
 
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", 
EntityType.FEED.toString());
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", 
EntityType.FEED.toString());

Reply via email to