Repository: falcon Updated Branches: refs/heads/master 90cb996f5 -> bfd1805ba
FALCON-2303 Backloginmins is not getting updated Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #385 from PraveenAdlakha/backlogfix and squashes the following commits: 8ad078946 [Praveen Adlakha] comments addressed 182e31c64 [Praveen Adlakha] FALCON-2303 Backloginmins is not getting updated Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/bfd1805b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/bfd1805b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/bfd1805b Branch: refs/heads/master Commit: bfd1805ba13e7755f01754edd062da3dc59aae3b Parents: 90cb996 Author: Praveen Adlakha <[email protected]> Authored: Thu Aug 31 14:39:32 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Aug 31 14:39:32 2017 +0530 ---------------------------------------------------------------------- .../falcon/persistence/PendingInstanceBean.java | 5 +++-- .../falcon/persistence/PersistenceConstants.java | 1 + .../apache/falcon/jdbc/MonitoringJdbcStateStore.java | 14 ++++++++++++++ .../falcon/service/BacklogMetricEmitterService.java | 1 + .../apache/falcon/service/EntitySLAAlertService.java | 4 ++-- .../falcon/service/EntitySLAMonitoringService.java | 14 +++++++++++--- src/conf/runtime.properties | 3 +++ 7 files changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 05c5ea3..d35d982 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -42,8 +42,9 @@ import java.util.Date; @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), - @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"), - @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType") + @NamedQuery(name = PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"), + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE, query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType and a.nominalTime >= :startTime and a.nominalTime < :endTime ") }) @Table(name = "PENDING_INSTANCES") //RESUME CHECKSTYLE CHECK LineLengthCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 1e6a04b..90dcf50 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -82,4 +82,5 @@ public final class PersistenceConstants { public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB"; public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION"; public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES"; + public static final String GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE = "GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 8da2389..51eac94 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -182,6 +182,20 @@ public class MonitoringJdbcStateStore { return result; } + public List<Date> getNominalInstancesBetweenTimeRange(String entityName, String clusterName, String entityType, + Date startTime, Date endTime) { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCES_BETWEEN_TIME_RANGE); + q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); + q.setParameter("startTime", startTime); + q.setParameter("endTime", endTime); + List result = q.getResultList(); + entityManager.close(); + return result; + } + public List<PendingInstanceBean> getAllPendingInstances(){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES); http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 2480c96..50170b9 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -117,6 +117,7 @@ public final class BacklogMetricEmitterService implements FalconService, } Process process = (Process) entity; if (process.getSla() != null) { + LOG.debug("Removing process:{} from monitoring", process.getName()); backlogMetricStore.deleteEntityBackLogInstances(entity.getName(), entity.getEntityType().name()); entityBacklogs.remove(entity); process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index 837a170..2f19e6b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -112,7 +112,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ return; } - LOG.trace("In processSLACandidates :" + pendingInstanceBeanList.size()); + LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); try{ for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { @@ -129,7 +129,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList if (schedulableEntityInstances.isEmpty()){ store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime, entityType); - return; + continue; } List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances); SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 09671d9..8b3dbe4 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -55,6 +55,7 @@ import org.apache.falcon.resource.SchedulableEntityInstance; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; @@ -207,9 +208,11 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList if (feed.getSla() != null && feed.getLocations() != null) { for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { + LOG.debug("Removing feed:{} for monitoring", feed.getName()); MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString()); MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(), EntityType.FEED.toString()); + LOG.debug("Removing feed:{} for monitoring", feed.getName()); } } } @@ -219,6 +222,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList if (process.getSla() != null){ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName())) { + LOG.debug("Removing feed:{} for monitoring", process.getName()); MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(), EntityType.PROCESS.toString()); MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(), @@ -364,6 +368,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList // add Instances from last checked time to 10 minutes from now(some buffer for status check) Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis); addPendingEntityInstances(newCheckPointTime); + } else { + LOG.debug("No entities present for sla monitoring."); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -450,7 +456,6 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType); AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine(); - InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, new Date(nominalTime.getTime() + 200), PROCESS_LIFE_CYCLE, false); if (instancesResult.getInstances().length > 0) { @@ -459,6 +464,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList entity.getName(), clusterName, nominalTime); return true; } + } else if ((System.currentTimeMillis() - nominalTime.getTime())/(1000*60*60*24) >= Integer.parseInt( + RuntimeProperties.get().getProperty("workflow.history.expiration.period.days", "7"))) { + return true; } return false; } @@ -552,8 +560,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName, Date start, Date end, String entityType) throws FalconException { Set<SchedulableEntityInstance> result = new HashSet<>(); - List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName, - entityType); + List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstancesBetweenTimeRange(entityName, + clusterName, entityType, start, end); if (missingInstances == null){ return result; } http://git-wip-us.apache.org/repos/asf/falcon/blob/bfd1805b/src/conf/runtime.properties ---------------------------------------------------------------------- diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties index 013ac18..62d12d4 100644 --- a/src/conf/runtime.properties +++ b/src/conf/runtime.properties @@ -86,3 +86,6 @@ falcon.current.colo=local ### Timeout factor for processes ### instance.timeout.factor=5 + +### Workflow expiration period for oozie ### +workflow.history.expiration.period.days=7 \ No newline at end of file
