Repository: falcon
Updated Branches:
  refs/heads/master 1f28bde6f -> 37cb056b9


FALCON-2204 Change mode for falcon_merge_pr.py to executable

This pull request changes the permissions to be executable.

Author: Ajay Yadava <[email protected]>

Reviewers: @pallavi-rao, @sandeepSamudrala

Closes #310 from ajayyadava/2204


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/37cb056b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/37cb056b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/37cb056b

Branch: refs/heads/master
Commit: 37cb056b93b6e414172e12605c33c27223d2800e
Parents: 1f28bde
Author: Ajay Yadava <[email protected]>
Authored: Thu Dec 1 09:33:13 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Thu Dec 1 09:33:13 2016 +0530

----------------------------------------------------------------------
 .../falcon/workflow/WorkflowExecutionArgs.java  |   2 +-
 falcon_merge_pr.py                              |   0
 .../org/apache/falcon/logging/JobLogMover.java  |   7 +
 .../falcon/jdbc/MonitoringJdbcStateStore.java   |  11 +-
 .../AbstractSchedulableEntityManager.java       |  21 ++-
 .../service/BacklogMetricEmitterService.java    |   8 +-
 .../service/EntitySLAMonitoringService.java     | 160 +++++++++++--------
 7 files changed, 121 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 2171092..682b14e 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -41,7 +41,7 @@ public enum WorkflowExecutionArgs {
     DATASOURCE_NAME("datasource", "name of the datasource", false),
 
     // who
-    WORKFLOW_USER("workflowUser", "user who owns the feed instance 
(partition)"),
+    WORKFLOW_USER("workflowUser", "user who ran the instance"),
 
     // what
     // workflow details

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/falcon_merge_pr.py
----------------------------------------------------------------------
diff --git a/falcon_merge_pr.py b/falcon_merge_pr.py
old mode 100644
new mode 100755

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java 
b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 535f62a..5023db3 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -91,6 +92,12 @@ public class JobLogMover {
                         context.getWorkflowId(), context.getWorkflowStatus());
                 return 0;
             }
+            String instanceOwner = context.getWorkflowUser();
+            if (StringUtils.isNotBlank(instanceOwner)) {
+                CurrentUser.authenticate(instanceOwner);
+            } else {
+                CurrentUser.authenticate(System.getProperty("user.name"));
+            }
             OozieClient client = new OozieClient(engineUrl);
             WorkflowJob jobInfo;
             try {

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java 
b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 669e18d..7f5776e 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.falcon.jdbc;
 
-import org.apache.commons.collections.CollectionUtils;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.persistence.MonitoredEntityBean;
 import org.apache.falcon.persistence.PendingInstanceBean;
@@ -188,14 +186,7 @@ public class MonitoringJdbcStateStore {
         EntityManager entityManager = getEntityManager();
         Query q = 
entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
         List result = q.getResultList();
-
-        try {
-            if (CollectionUtils.isEmpty(result)) {
-                return null;
-            }
-        } finally{
-            entityManager.close();
-        }
+        entityManager.close();
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
 
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index ef9cf69..c5b3ecc 100644
--- 
a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ 
b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -164,27 +164,32 @@ public abstract class AbstractSchedulableEntityManager 
extends AbstractInstanceM
      * @param endStr
      */
     public SchedulableEntityInstanceResult 
getEntitySLAMissPendingAlerts(String entityName, String entityType,
-                                                                         
String startStr, String endStr, String colo) {
-
+                                                                         
String startStr, String endStr,
+                                                                         
String colo) {
         Set<SchedulableEntityInstance> instances = new HashSet<>();
+        String resultMessage = "Success!";
         try {
             checkColo(colo);
             Date start = EntityUtil.parseDateUTC(startStr);
             Date end = (endStr == null) ? new Date() : 
EntityUtil.parseDateUTC(endStr);
-
             if (StringUtils.isBlank(entityName)) {
-                
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start,
 end));
+                instances = 
EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end);
             } else {
-                for (String clusterName : DeploymentUtil.getCurrentClusters()) 
{
-                    
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName,
-                            clusterName, start, end, entityType));
+                String status = 
getStatusString(EntityUtil.getEntity(entityType, entityName));
+                if (status.equals(EntityStatus.RUNNING.name())) {
+                    for (String clusterName : 
DeploymentUtil.getCurrentClusters()) {
+                        
instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(entityName,
+                                clusterName, start, end, entityType));
+                    }
+                } else {
+                    resultMessage = entityName + " is " + status;
                 }
             }
         } catch (FalconException e) {
             throw FalconWebException.newAPIException(e);
         }
         SchedulableEntityInstanceResult result = new 
SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED,
-                "Success!");
+                resultMessage);
         result.setCollection(instances.toArray());
         return result;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
 
b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index b01b181..7688619 100644
--- 
a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ 
b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -80,6 +81,9 @@ public final class BacklogMetricEmitterService implements 
FalconService,
     private static MetricNotificationService metricNotificationService =
             Services.get().getService(MetricNotificationService.SERVICE_NAME);
 
+    private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
+            Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+
     public static BacklogMetricEmitterService get() {
         return SERVICE;
     }
@@ -149,7 +153,7 @@ public final class BacklogMetricEmitterService implements 
FalconService,
             for(Cluster cluster : process.getClusters().getClusters()){
                 dropMetric(cluster.getName(), process);
             }
-        }else{
+        } else {
             addToBacklog(newEntity);
         }
     }
@@ -412,7 +416,7 @@ public final class BacklogMetricEmitterService implements 
FalconService,
                                         continue;
                                     }
                                     InstancesResult status = 
wfEngine.getStatus(entity, nominalTime,
-                                            nominalTime, null, null);
+                                            new Date(nominalTime.getTime() + 
200), PROCESS_LIFE_CYCLE, false);
                                     if (status.getInstances().length > 0
                                             && status.getInstances()[0].status 
== InstancesResult.
                                             WorkflowStatus.SUCCEEDED) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/37cb056b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 00e116b..451fb95 100644
--- 
a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ 
b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -20,6 +20,7 @@ package org.apache.falcon.service;
 import com.google.common.annotations.VisibleForTesting;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -27,7 +28,9 @@ import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
@@ -47,7 +50,6 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
 import org.apache.falcon.persistence.MonitoredEntityBean;
 import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.security.CurrentUser;
@@ -82,6 +84,9 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
     public static final String TAG_WARN = "Missed-SLA-Low";
     private static final long MINUTE_DELAY = 60000L;
 
+    private static final List<LifeCycle> PROCESS_LIFE_CYCLE =
+            Arrays.asList(LifeCycle.valueOf(LifeCycle.EXECUTION.name()));
+
     private EntitySLAMonitoringService() {
 
     }
@@ -127,22 +132,27 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
 
     @Override
     public void onAdd(Entity entity) throws FalconException {
+        startEntityMonitoring(entity, false);
+    }
+
+    private void startEntityMonitoring(Entity entity, boolean isEntityUpdated) 
throws FalconException{
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
         Set<String> clustersDefined = EntityUtil.getClustersDefined(entity);
         if (entity.getEntityType() == EntityType.FEED) {
             Feed feed = (Feed) entity;
-            // currently sla service is enabled only for fileSystemStorage
+            // currently sla service for feed is enabled only for 
fileSystemStorage
             if (feed.getLocations() != null || feed.getSla() != null || 
checkFeedClusterSLA(feed)) {
                 for (String cluster : clustersDefined) {
                     if (currentClusters.contains(cluster)) {
                         if (FeedHelper.getSLA(cluster, feed) != null) {
                             LOG.debug("Adding feed:{} for monitoring", 
feed.getName());
-                            
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), 
EntityType.FEED.toString(),
-                                    new Date(now().getTime() + MINUTE_DELAY));
-                            List<Date> instances = 
EntityUtil.getEntityInstanceTimesInBetween(entity, cluster,
-                                    getStartTime(entity, cluster), now());
-                            
addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, 
cluster,
-                                    instances);
+                            if (isEntityUpdated) {
+                                
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(),
+                                        EntityType.FEED.toString(), now());
+                            } else {
+                                
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(),
+                                        EntityType.FEED.toString(), 
getStartTime(entity, cluster));
+                            }
                         }
                     }
                 }
@@ -153,11 +163,13 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
                 for (String cluster : clustersDefined) {
                     if (currentClusters.contains(cluster)) {
                         LOG.debug("Adding process:{} for monitoring", 
process.getName());
-                        
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
-                                EntityType.PROCESS.toString(), new 
Date(now().getTime() + MINUTE_DELAY));
-                        List<Date> instances = 
EntityUtil.getEntityInstanceTimesInBetween(entity, cluster,
-                                getStartTime(entity, cluster), now());
-                        
addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, 
cluster, instances);
+                        if (isEntityUpdated) {
+                            
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
+                                    EntityType.PROCESS.toString(), now());
+                        } else {
+                            
MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
+                                    EntityType.PROCESS.toString(), 
getStartTime(entity, cluster));
+                        }
                     }
                 }
             }
@@ -298,20 +310,14 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
 
     @Override
     public void init() throws FalconException {
-        String uri = 
StartupProperties.get().getProperty("entity.sla.service.store.uri");
-        storePath = new Path(uri);
-        filePath = new Path(storePath, "entitySLAMonitoringService");
-        fileSystem = initializeFileSystem();
 
         String freq = 
StartupProperties.get().getProperty("entity.sla.statusCheck.frequency.seconds", 
"600");
         statusCheckFrequencySeconds = Integer.parseInt(freq);
 
         freq = 
StartupProperties.get().getProperty("entity.sla.lookAheadWindow.millis", 
"900000");
         lookAheadWindowMillis = Integer.parseInt(freq);
-        LOG.info("Initializing EntitySLAMonitoringService from ", 
filePath.toString());
         ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1);
-        addPendingEntityInstances(EntityType.FEED.name(), null, now());
-        addPendingEntityInstances(EntityType.PROCESS.name(), null, now());
+        addPendingEntityInstances(now());
         executor.scheduleWithFixedDelay(new Monitor(), 0, 
statusCheckFrequencySeconds, TimeUnit.SECONDS);
     }
 
@@ -353,13 +359,11 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
         public void run() {
             try {
                 if 
(MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities().size() > 0) {
-                    
checkPendingInstanceAvailability(EntityType.FEED.toString());
-                    
checkPendingInstanceAvailability(EntityType.PROCESS.toString());
+                    checkPendingInstanceAvailability();
 
                     // add Instances from last checked time to 10 minutes from 
now(some buffer for status check)
                     Date newCheckPointTime = new Date(now().getTime() + 
lookAheadWindowMillis);
-                    addPendingEntityInstances(EntityType.FEED.toString(), 
null, newCheckPointTime);
-                    addPendingEntityInstances(EntityType.PROCESS.toString(), 
null, newCheckPointTime);
+                    addPendingEntityInstances(newCheckPointTime);
                 }
             } catch (Throwable e) {
                 LOG.error("Feed SLA monitoring failed: ", e);
@@ -380,29 +384,35 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
         }
     }
 
-    void addPendingEntityInstances(String entityType, Date startTime, Date 
endTime) throws FalconException {
+    void addPendingEntityInstances(Date checkPointTime) throws FalconException 
{
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.
-                getAllMonitoredEntities(entityType);
+        List<MonitoredEntityBean> entityBeanList = 
MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities();
         for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
             String entityName = monitoredEntityBean.getEntityName();
-            Date lastMonitoredInstanceTime = (startTime != null) ? startTime
-                    : monitoredEntityBean.getLastMonitoredTime();
-            Date newCheckPointTime = endTime != null ? endTime : now();
-            Entity entity = EntityUtil.getEntity(entityType, entityName);
-            Set<String> clustersDefined =  
EntityUtil.getClustersDefined(entity);
-            List<org.apache.falcon.entity.v0.cluster.Cluster> clusters = new 
ArrayList();
-            for(String cluster : clustersDefined){
-                clusters.add(ClusterHelper.getCluster(cluster));
-            }
-            for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : 
clusters) {
-                if (currentClusters.contains(entityCluster.getName())) {
-                    List<Date> instances = 
EntityUtil.getEntityInstanceTimesInBetween(entity, entityCluster.getName(),
-                            lastMonitoredInstanceTime, newCheckPointTime);
-                    addPendingInstances(entityType, entity, 
entityCluster.getName(), instances);
-                    // update last monitored time with the new checkpoint time
-                    
MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType,
-                            new Date(newCheckPointTime.getTime() + 
MINUTE_DELAY));
+            String entityType = monitoredEntityBean.getEntityType();
+            if (EntityType.FEED.name().equalsIgnoreCase(entityType)
+                    || isEntityRunning(EntityUtil.getEntity(entityType, 
entityName))) {
+                Date lastMonitoredInstanceTime = 
monitoredEntityBean.getLastMonitoredTime();
+                Date newCheckPointTime = checkPointTime;
+                Entity entity = EntityUtil.getEntity(entityType, entityName);
+                Set<String> clustersDefined = 
EntityUtil.getClustersDefined(entity);
+                List<org.apache.falcon.entity.v0.cluster.Cluster> clusters = 
new ArrayList();
+                for (String cluster : clustersDefined) {
+                    clusters.add(ClusterHelper.getCluster(cluster));
+                }
+                for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster 
: clusters) {
+                    if (currentClusters.contains(entityCluster.getName())) {
+                        Date endTime = EntityUtil.getEndTime(entity, 
entityCluster.getName());
+                        if (endTime.before(now())) {
+                            newCheckPointTime = endTime;
+                        }
+                        List<Date> instances = 
EntityUtil.getEntityInstanceTimesInBetween(entity,
+                                entityCluster.getName(), 
lastMonitoredInstanceTime, newCheckPointTime);
+                        addPendingInstances(entityType, entity, 
entityCluster.getName(), instances);
+                        // update last monitored time with the new checkpoint 
time
+                        
MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType,
+                                new Date(newCheckPointTime.getTime() + 
MINUTE_DELAY));
+                    }
                 }
             }
         }
@@ -412,20 +422,20 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
     /**
      * Checks the availability of all the pendingInstances and removes the 
ones which have become available.
      */
-    private void checkPendingInstanceAvailability(String entityType) throws 
FalconException {
-        if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){
+    private void checkPendingInstanceAvailability() throws FalconException {
+        List<PendingInstanceBean> pendingInstanceBeans = 
MONITORING_JDBC_STATE_STORE.getAllPendingInstances();
+        if (pendingInstanceBeans.isEmpty()){
             LOG.info("No pending instances to be checked");
             return;
         }
-        for(PendingInstanceBean pendingInstanceBean : 
MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
-            for (Date instanceTime : 
MONITORING_JDBC_STATE_STORE.getNominalInstances(
-                    pendingInstanceBean.getEntityName(), 
pendingInstanceBean.getClusterName(), entityType)) {
-                boolean status = 
checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
-                        pendingInstanceBean.getClusterName(), instanceTime, 
entityType);
-                if (status) {
-                    
MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
-                            pendingInstanceBean.getClusterName(), 
instanceTime, EntityType.FEED.toString());
-                }
+        for(PendingInstanceBean pendingInstanceBean : pendingInstanceBeans){
+            boolean status = 
checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
+                    pendingInstanceBean.getClusterName(), 
pendingInstanceBean.getNominalTime(),
+                    pendingInstanceBean.getEntityType());
+            if (status) {
+                
MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
+                        pendingInstanceBean.getClusterName(), 
pendingInstanceBean.getNominalTime(),
+                        pendingInstanceBean.getEntityType());
             }
         }
     }
@@ -434,21 +444,25 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
     private boolean checkEntityInstanceAvailability(String entityName, String 
clusterName, Date nominalTime,
                                                     String entityType) throws 
FalconException {
         Entity entity = EntityUtil.getEntity(entityType, entityName);
-        authenticateUser();
+        authenticateUser(entity);
         try {
-            if (entityType.equals(EntityType.PROCESS.toString())){
+            if (entityType.equalsIgnoreCase(EntityType.PROCESS.toString())){
                 LOG.trace("Checking instance availability status for 
entity:{}, cluster:{}, "
                         + "instanceTime:{}", entity.getName(), clusterName, 
nominalTime, entityType);
                 AbstractWorkflowEngine wfEngine = 
WorkflowEngineFactory.getWorkflowEngine();
-                InstancesResult instancesResult = wfEngine.getStatus(entity, 
nominalTime, nominalTime, null, null);
-                if 
(instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){
-                    LOG.trace("Entity instance(Process:{}, cluster:{}, 
instanceTime:{}) is available.",
-                            entity.getName(), clusterName, nominalTime);
-                    return true;
+
+                InstancesResult instancesResult = wfEngine.getStatus(entity, 
nominalTime,
+                        new Date(nominalTime.getTime() + 200), 
PROCESS_LIFE_CYCLE, false);
+                if (instancesResult.getInstances().length > 0) {
+                    if 
(instancesResult.getInstances()[0].status.equals(InstancesResult.WorkflowStatus.SUCCEEDED)){
+                        LOG.trace("Entity instance(Process:{}, cluster:{}, 
instanceTime:{}) is available.",
+                                entity.getName(), clusterName, nominalTime);
+                        return true;
+                    }
                 }
                 return false;
             }
-            if (entityType.equals(EntityType.FEED.toString())){
+            if (entityType.equalsIgnoreCase(EntityType.FEED.toString())){
                 LOG.trace("Checking instance availability status for feed:{}, 
cluster:{}, instanceTime:{}",
                         entity.getName(), clusterName, nominalTime);
 
@@ -466,7 +480,7 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
                     clusterName, entityType, nominalTime, e);
         }
         LOG.debug("Entity instance(entity:{}, cluster:{}, instanceTime:{}) is 
not available.", entity.getName(),
-            clusterName, nominalTime);
+                clusterName, nominalTime);
         return false;
     }
 
@@ -509,7 +523,7 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
                 org.apache.falcon.entity.v0.process.Cluster cluster = 
ProcessHelper.getCluster(process,
                         entityClusterPair.second);
                 org.apache.falcon.entity.v0.process.Sla sla = 
ProcessHelper.getSLA(cluster, process);
-                if (sla != null){
+                if (sla != null && isEntityRunning(process)){
                     Set<Pair<Date, String>> slaStatus = 
getProcessSLAStatus(sla, start, end,
                             
MONITORING_JDBC_STATE_STORE.getNominalInstances(entityClusterPair.first,
                                     entityClusterPair.second, entityType));
@@ -659,9 +673,21 @@ public final class EntitySLAMonitoringService implements 
ConfigurationChangeList
     }
 
     // Authenticate user only if not already authenticated.
-    private void authenticateUser(){
+    private void authenticateUser(Entity entity){
         if (!CurrentUser.isAuthenticated()) {
-            CurrentUser.authenticate(System.getProperty("user.name"));
+            if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
+                CurrentUser.authenticate(entity.getACL().getOwner());
+            } else {
+                CurrentUser.authenticate(System.getProperty("user.name"));
+            }
         }
     }
+
+
+    private boolean isEntityRunning(Entity entity) throws FalconException {
+        authenticateUser(entity);
+        AbstractWorkflowEngine workflowEngine = 
WorkflowEngineFactory.getWorkflowEngine();
+        return workflowEngine.isActive(entity) && 
!workflowEngine.isSuspended(entity)
+                && !workflowEngine.isCompleted(entity);
+    }
 }

Reply via email to