Repository: falcon Updated Branches: refs/heads/master 4746e039a -> aa522a548
FALCON-2170 Umbrella jira for bugs in EntitySLAMonitoring and Backlog⦠â¦Emitter service Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #289 from PraveenAdlakha/enity_fixes and squashes the following commits: 6b90812 [Praveen Adlakha] comments addressed d8d1571 [Praveen Adlakha] FALCON-2170 Umbrella jira for bugs in EntitySLAMonitoring and BacklogEmitter service Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aa522a54 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aa522a54 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aa522a54 Branch: refs/heads/master Commit: aa522a5489bed408573b375f897b8385d06e3cce Parents: 4746e03 Author: Praveen Adlakha <[email protected]> Authored: Mon Oct 24 12:22:34 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Oct 24 12:22:34 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/entity/EntityUtil.java | 89 +++++++++- .../metrics/MetricNotificationService.java | 11 ++ .../falcon/persistence/EntitySLAAlertBean.java | 10 +- .../falcon/persistence/MonitoredEntityBean.java | 39 +++-- .../falcon/persistence/PendingInstanceBean.java | 12 +- .../persistence/PersistenceConstants.java | 13 +- common/src/main/resources/startup.properties | 19 ++- .../apache/falcon/entity/EntityUtilTest.java | 30 ++++ .../twiki/BacklogMetricEmitterService.twiki | 2 + .../falcon/messaging/JMSMessageConsumer.java | 3 +- .../org/apache/falcon/logging/JobLogMover.java | 1 - .../apache/falcon/service/LogMoverService.java | 18 ++- .../workflow/engine/OozieWorkflowEngine.java | 5 +- .../apache/falcon/jdbc/BacklogMetricStore.java | 12 ++ .../falcon/jdbc/MonitoringJdbcStateStore.java | 123 ++++++-------- .../plugin/GraphiteNotificationPlugin.java | 1 - .../service/BacklogMetricEmitterService.java | 115 ++++++++++--- .../falcon/service/EntitySLAAlertService.java | 2 +- .../service/EntitySLAMonitoringService.java | 161 +++++++++---------- .../jdbc/MonitoringJdbcStateStoreTest.java | 31 +++- .../service/EntitySLAAlertServiceTest.java | 2 +- src/conf/startup.properties | 15 +- 22 files changed, 466 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 8fe316c..f3d5d28 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -1024,18 +1024,69 @@ public final class EntityUtil { */ public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) { Date start = null; + Date end = null; + switch (entity.getEntityType()) { case FEED: Feed feed = (Feed) entity; - start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart(); + org.apache.falcon.entity.v0.feed.Validity feedValidity = + FeedHelper.getCluster(feed, clusterName).getValidity(); + start = feedValidity.getStart(); + end = feedValidity.getEnd().before(endRange) ? feedValidity.getEnd() : endRange; return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(), - startRange, endRange); + startRange, end); case PROCESS: + Process process = (Process) entity; - start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart(); + org.apache.falcon.entity.v0.process.Validity processValidity = + ProcessHelper.getCluster(process, clusterName).getValidity(); + start = processValidity.getStart(); + end = processValidity.getEnd().before(endRange) ? processValidity.getEnd() : endRange; return getInstanceTimes(start, process.getFrequency(), + process.getTimezone(), startRange, end); + + default: + throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType()); + } + } + + /** + * Find the entity instance times in between the given time range. + * <p/> + * Both start and end Date are inclusive. + * + * @param entity feed or process entity whose instance times are to be found + * @param clusterName name of the cluster + * @param startRange start time for the input range + * @param endRange end time for the input range + * @return List of instance times in between the given time range + */ + public static List<Date> getEntityInstanceTimesInBetween(Entity entity, String clusterName, Date startRange, + Date endRange) { + Date start = null; + Date end = null; + + + switch (entity.getEntityType()) { + case FEED: + Feed feed = (Feed) entity; + org.apache.falcon.entity.v0.feed.Validity feedValidity = + FeedHelper.getCluster(feed, clusterName).getValidity(); + start = feedValidity.getStart(); + end = feedValidity.getEnd(); + return getInstancesInBetween(start, end, feed.getFrequency(), feed.getTimezone(), + startRange, endRange); + + case PROCESS: + Process process = (Process) entity; + org.apache.falcon.entity.v0.process.Validity processValidity = + ProcessHelper.getCluster(process, clusterName).getValidity(); + start = processValidity.getStart(); + end = processValidity.getEnd(); + + return getInstancesInBetween(start, end, process.getFrequency(), process.getTimezone(), startRange, endRange); default: @@ -1066,13 +1117,37 @@ public final class EntityUtil { Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange); while (true) { - Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, current); - if (nextStartTime.after(endRange)){ + Date nextInstanceTime = getNextStartTime(startTime, frequency, timeZone, current); + if (nextInstanceTime.after(endRange)){ break; } - result.add(nextStartTime); + result.add(nextInstanceTime); // this is required because getNextStartTime returns greater than or equal to referenceTime - current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later + current = new Date(nextInstanceTime.getTime() + ONE_MS); // 1 milli seconds later + } + return result; + } + + + public static List<Date> getInstancesInBetween(Date startTime, Date endTime, Frequency frequency, TimeZone timeZone, + Date startRange, Date endRange) { + List<Date> result = new LinkedList<>(); + if (endRange.before(startRange)) { + return result; + } + if (timeZone == null) { + timeZone = TimeZone.getTimeZone("UTC"); + } + Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange); + while (true) { + if (!current.before(startRange) && !current.after(endRange) + && current.before(endTime) && !current.before(startTime)) { + result.add(current); + } + current = getNextInstanceTime(current, frequency, timeZone, 1); + if (current.after(endRange)){ + break; + } } return result; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java index 30e6bb6..90fbfa9 100644 --- a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java +++ b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Map; +import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -94,6 +95,16 @@ public class MetricNotificationService implements FalconService { } } + public void deleteMetric(String metricName){ + synchronized (this){ + SortedMap<String, Gauge> gaugeMap = metricRegistry.getGauges(); + if (gaugeMap.get(metricName) != null){ + metricRegistry.remove(metricName); + metricMap.remove(metricName); + } + } + } + private static class MetricGauge implements Gauge<Long> { private Long value=0L; http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java index 186c5e0..6482e8c 100644 --- a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java @@ -35,7 +35,7 @@ import javax.validation.constraints.NotNull; //SUSPEND CHECKSTYLE CHECK LineLengthCheck /** - * Entity SLA monitoring. + * Feed SLA monitoring. * */ @Entity @NamedQueries({ @@ -148,12 +148,12 @@ public class EntitySLAAlertBean { this.isSLAHighMissed = isSLAHighMissed; } - public static final String ENTITYNAME = "entityName"; + public static final String ENTITY_NAME = "entityName"; - public static final String CLUSTERNAME = "clusterName"; + public static final String CLUSTER_NAME = "clusterName"; - public static final String ENTITYTYPE = "entityType"; + public static final String ENTITY_TYPE = "entityType"; - public static final String NOMINALTIME = "nominalTime"; + public static final String NOMINAL_TIME = "nominalTime"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java index c620e45..5181cf5 100644 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java @@ -29,22 +29,26 @@ import javax.persistence.Id; import javax.persistence.Column; import javax.persistence.Basic; import javax.validation.constraints.NotNull; +import java.util.Date; //SUSPEND CHECKSTYLE CHECK LineLengthCheck /** -* The Feeds that are to be monitered will be stored in the db. +* The Entities that are to be monitored will be stored in MONITORED_ENTITY table. * */ @Entity @NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from " + @NamedQuery(name = PersistenceConstants.GET_MONITORED_ENTITY, query = "select OBJECT(a) from " + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"), - @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean " + @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_ENTITIES, query = "delete from MonitoredEntityBean " + "a where a.entityName = :entityName and a.entityType = :entityType"), - @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query = "select OBJECT(a) " + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITIES_FOR_TYPE, query = "select OBJECT(a) " + "from MonitoredEntityBean a where a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, query = "select OBJECT(a) " - + "from MonitoredEntityBean a") + + "from MonitoredEntityBean a"), + @NamedQuery(name = PersistenceConstants.UPDATE_LAST_MONITORED_TIME, query = "update MonitoredEntityBean a " + + "set a.lastMonitoredTime = :lastMonitoredTime where a.entityName = :entityName and a.entityType = " + + ":entityType") }) @Table(name="MONITORED_ENTITY") //RESUME CHECKSTYLE CHECK LineLengthCheck @@ -73,12 +77,25 @@ public class MonitoredEntityBean { @Column(name = "entity_type") private String entityType; - public String getFeedName() { + public String getEntityName() { return entityName; } - public void setEntityName(String feedName) { - this.entityName = feedName; + public void setEntityName(String entityName) { + this.entityName = entityName; + } + + @Basic + @NotNull + @Column(name = "last_monitored_time") + private Date lastMonitoredTime; + + public Date getLastMonitoredTime() { + return lastMonitoredTime; + } + + public void setLastMonitoredTime(Date lastMonitoredTime) { + this.lastMonitoredTime = lastMonitoredTime; } public String getId() { @@ -89,8 +106,10 @@ public class MonitoredEntityBean { this.id = id; } - public static final String ENTITYNAME = "entityName"; + public static final String ENTITY_NAME = "entityName"; + + public static final String ENTITY_TYPE = "entityType"; - public static final String ENTITYTYPE = "entityType"; + public static final String LAST_MONITORED_TIME = "lastMonitoredTime"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 43b6b8e..05c5ea3 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -40,9 +40,9 @@ import java.util.Date; @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), - @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), - @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a "), + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a order by a.nominalTime asc"), @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType") }) @Table(name = "PENDING_INSTANCES") @@ -114,12 +114,12 @@ public class PendingInstanceBean { this.entityName = entityName; } - public static final String ENTITYNAME = "entityName"; + public static final String ENTITY_NAME = "entityName"; - public static final String CLUSTERNAME = "clusterName"; + public static final String CLUSTER_NAME = "clusterName"; - public static final String NOMINALTIME = "nominalTime"; + public static final String NOMINAL_TIME = "nominalTime"; - public static final String ENTITYTYPE = "entityType"; + public static final String ENTITY_TYPE = "entityType"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 5c3de51..8be0eb5 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -24,13 +24,16 @@ public final class PersistenceConstants { private PersistenceConstants(){ } - public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE"; - public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; - public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = "GET_ALL_MONITORING_ENTITY_FOR_TYPE"; + public static final String GET_MONITORED_ENTITY = "GET_MONITORED_ENTITY"; + public static final String DELETE_MONITORED_ENTITIES = "DELETE_MONITORED_ENTITIES"; + public static final String GET_ALL_MONITORING_ENTITIES_FOR_TYPE = "GET_ALL_MONITORING_ENTITIES_FOR_TYPE"; + public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY"; + public static final String UPDATE_LAST_MONITORED_TIME = "UPDATE_LAST_MONITORED_TIME"; + public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE"; public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; - public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY"; + public static final String DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY = "DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; public static final String GET_ENTITY = "GET_ENTITY"; @@ -63,5 +66,5 @@ public final class PersistenceConstants { public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE"; public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE"; public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES"; - public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY"; + public static final String DELETE_ALL_BACKLOG_ENTITY_INSTANCES ="DELETE_ALL_BACKLOG_ENTITY_INSTANCES"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 3beab62..9fb1c0a 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -69,6 +69,10 @@ org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService + +## if you wish to use BacklogEmitterService please add BackLogEmitter service as a configstore listners.## +# org.apache.falcon.service.BacklogMetricEmitterService + ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## # org.apache.falcon.state.store.jdbc.JdbcStateStore @@ -83,7 +87,9 @@ ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners= +#org.apache.falcon.handler.SLAMonitoringHandler #org.apache.falcon.service.LogMoverService +#org.apache.falcon.service.BacklogMetricEmitterService ######### Implementation classes ######### @@ -244,15 +250,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Authorization Enabled flag: false (default)|true *.falcon.security.authorization.enabled=false -# CSRF filter enabled flag: false (default) | true -*.falcon.security.csrf.enabled=false - -# Custom header for CSRF filter -*.falcon.security.csrf.header=FALCON-CSRF-FILTER - -# Browser user agents to be filtered -*.falcon.security.csrf.browser=^Mozilla.*,^Opera.* - # The name of the group of super-users *.falcon.security.authorization.superusergroup=falcon @@ -351,5 +348,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.postprocessing.enable=true ### LogMoveService Properties -*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.max.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 +##Note min threadCount should always be smaller than max threadCount. +*.falcon.logMoveService.min.threadCount=20 http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java index 42ae3e6..28a9270 100644 --- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java +++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java @@ -44,6 +44,7 @@ import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.TimeZone; @@ -81,6 +82,35 @@ public class EntityUtilTest extends AbstractTestBase { view = EntityUtil.getClusterView(feed, "backupCluster"); Assert.assertEquals(view.getClusters().getClusters().size(), 2); } + @Test + public void testGetInstancesInBetween(){ + Date startTime = SchemaHelper.parseDateUTC("2016-09-30T15:24Z"); + Date endTime = SchemaHelper.parseDateUTC("2016-09-30T17:04Z"); + Frequency frequency = new Frequency("minutes(5)"); + Date startRange = SchemaHelper.parseDateUTC("2016-09-30T15:25Z"); + Date endRange = SchemaHelper.parseDateUTC("2016-09-30T15:30Z"); + List<Date> instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, + endRange); + startRange = SchemaHelper.parseDateUTC("2016-09-30T15:18Z"); + endRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z"); + instances.addAll(EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange)); + Assert.assertEquals(instances.size(), 2); + startRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z"); + endRange = SchemaHelper.parseDateUTC("2016-09-30T15:25Z"); + instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange); + Assert.assertEquals(instances.size(), 1); + + frequency = new Frequency("minutes(2)"); + startRange = SchemaHelper.parseDateUTC("2016-09-30T16:32Z"); + endRange = SchemaHelper.parseDateUTC("2016-09-30T17:02Z"); + instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange); + Assert.assertEquals(instances.size(), 16); + startRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z"); + endRange = SchemaHelper.parseDateUTC("2016-09-30T17:05Z"); + instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange); + Assert.assertEquals(instances.size(), 50); + + } @Test public void testEquals() throws Exception { http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/docs/src/site/twiki/BacklogMetricEmitterService.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/BacklogMetricEmitterService.twiki b/docs/src/site/twiki/BacklogMetricEmitterService.twiki index 2b10f6c..f92b594 100644 --- a/docs/src/site/twiki/BacklogMetricEmitterService.twiki +++ b/docs/src/site/twiki/BacklogMetricEmitterService.twiki @@ -41,6 +41,8 @@ Following services and listeners should be enabled for Backlog Emitter Service i # org.apache.falcon.service.BacklogMetricEmitterService *.entityAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService + +*.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler </verbatim> http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 8b48e93..5383e7f 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -157,7 +157,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name()); wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user")); wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name()); - + wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID, + json.getString("id").concat("@user-action")); String appType = message.getStringProperty("appType"); return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType)); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 6ec2a20..72c3dc5 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -92,7 +92,6 @@ public class JobLogMover { LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e); return 0; } - //Assumption is - Each wf run will have a directory //the corresponding job logs are stored within the respective dir Path path = new Path(context.getLogDir() + "/" http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java index 7d1425a..7e4640e 100644 --- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java +++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java @@ -46,14 +46,24 @@ public class LogMoverService implements WorkflowExecutionListener { private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt( StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50"))); - private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120, + private ExecutorService executorService = new ThreadPoolExecutor(getCorePoolSize(), getThreadCount(), 120, TimeUnit.SECONDS, blockingQueue); + + public int getCorePoolSize(){ + try{ + return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.min.threadCount", "20")); + } catch (NumberFormatException e){ + LOG.error("Exception in LogMoverService", e); + return 20; + } + } public int getThreadCount() { try{ - return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200")); + return Integer.parseInt(StartupProperties.get() + .getProperty("falcon.logMoveService.max.threadCount", "200")); } catch (NumberFormatException e){ LOG.error("Exception in LogMoverService", e); - return 50; + return 200; } } @@ -86,7 +96,7 @@ public class LogMoverService implements WorkflowExecutionListener { if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) { return; } - while(0<blockingQueue.remainingCapacity()){ + while(blockingQueue.remainingCapacity()<=0){ try { LOG.trace("Sleeping, no capacity in threadpool...."); TimeUnit.MILLISECONDS.sleep(500); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 6964200..06e4cb2 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -589,10 +589,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles) throws FalconException { + List<LifeCycle> lifeCycles) throws FalconException { return doJobAction(JobAction.IGNORE, entity, start, end, props, lifeCycles); } - @Override public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles, @@ -1111,7 +1110,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { if (CoordinatorAction.Status.READY.toString().equals(status)) { return InstancesResult.WorkflowStatus.READY.name(); } else if (CoordinatorAction.Status.WAITING.toString().equals(status) - || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) { + || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) { return InstancesResult.WorkflowStatus.WAITING.name(); } else if (CoordinatorAction.Status.KILLED.toString().equals(status)) { return InstancesResult.WorkflowStatus.KILLED.name(); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java index ef9a396..621974d 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java @@ -77,6 +77,18 @@ public class BacklogMetricStore { } } + public void deleteEntityInstance(String entityName){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES); + q.setParameter("entityName", entityName); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + private void beginTransaction(EntityManager entityManager) { entityManager.getTransaction().begin(); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index c479940..552ebde 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -44,11 +44,12 @@ public class MonitoringJdbcStateStore { } - public void putMonitoredEntity(String entityName, String entityType) throws FalconException{ + public void putMonitoredEntity(String entityName, String entityType, Date lastMonitoredTime) throws FalconException{ MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean(); monitoredEntityBean.setEntityName(entityName); monitoredEntityBean.setEntityType(entityType); + monitoredEntityBean.setLastMonitoredTime(lastMonitoredTime); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); @@ -58,11 +59,25 @@ public class MonitoringJdbcStateStore { } } + public void updateLastMonitoredTime(String entityName, String entityType, Date lastCheckedTime) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_LAST_MONITORED_TIME); + q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName); + q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase()); + q.setParameter(MonitoredEntityBean.LAST_MONITORED_TIME, lastCheckedTime); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){ EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); - q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); - q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase()); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITORED_ENTITY); + q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName); + q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase()); List result = q.getResultList(); try { if (result.isEmpty()) { @@ -77,9 +92,9 @@ public class MonitoringJdbcStateStore { public void deleteMonitoringEntity(String entityName, String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); - q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); - q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase()); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_ENTITIES); + q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName); + q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -87,7 +102,7 @@ public class MonitoringJdbcStateStore { } } - public List<MonitoredEntityBean> getAllMonitoredEntity() throws ResultNotFoundException { + public List<MonitoredEntityBean> getAllMonitoredEntities() throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY); List result = q.getResultList(); @@ -95,10 +110,10 @@ public class MonitoringJdbcStateStore { return result; } - public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType) throws ResultNotFoundException { + public List<MonitoredEntityBean> getAllMonitoredEntities(String entityType) throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); - Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITIES_FOR_TYPE); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); List result = q.getResultList(); entityManager.close(); return result; @@ -107,8 +122,8 @@ public class MonitoringJdbcStateStore { public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class); - q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); Date result = (Date)q.getSingleResult(); entityManager.close(); return result; @@ -118,10 +133,10 @@ public class MonitoringJdbcStateStore { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); - q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName); + q.setParameter(PendingInstanceBean.NOMINAL_TIME, nominalTime); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -132,10 +147,10 @@ public class MonitoringJdbcStateStore { public void deletePendingInstances(String entityName, String clusterName, String entityType){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY); - q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY); + q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -160,9 +175,9 @@ public class MonitoringJdbcStateStore { public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); - q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName); + q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase()); List result = q.getResultList(); entityManager.close(); return result; @@ -188,34 +203,16 @@ public class MonitoringJdbcStateStore { entityManager.close(); } - public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime, - String entityType) { - EntityManager entityManager = getEntityManager(); - beginTransaction(entityManager); - TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE, - PendingInstanceBean.class); - q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - - q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); - q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); - q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase()); - try { - return q.getSingleResult(); - } finally { - commitAndCloseTransaction(entityManager); - } - } - public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants. GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class); - q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); - q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); - q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase()); try { return q.getSingleResult(); } finally { @@ -245,10 +242,10 @@ public class MonitoringJdbcStateStore { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH); - q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); - q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); - q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -260,10 +257,10 @@ public class MonitoringJdbcStateStore { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE); - q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); - q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); - q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); - q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase()); + q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase()); try{ q.executeUpdate(); } finally { @@ -271,24 +268,6 @@ public class MonitoringJdbcStateStore { } } - - public List<EntitySLAAlertBean> getSLAHighCandidates() { - EntityManager entityManager = getEntityManager(); - beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES); - List result = q.getResultList(); - - try { - if (CollectionUtils.isEmpty(result)) { - return null; - } - } finally{ - entityManager.close(); - } - return result; - } - - private void beginTransaction(EntityManager entityManager) { entityManager.getTransaction().begin(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java index 881f8ce..56df23b 100644 --- a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java +++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java @@ -50,7 +50,6 @@ public class GraphiteNotificationPlugin implements MonitoringPlugin { String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName")) ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name"); String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix"); - String separator = "."; LOG.debug("message:" + message.getAction()); if (entityType.equalsIgnoreCase(EntityType.PROCESS.name()) && ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index d9ac386..3aa2155 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.jdbc.BacklogMetricStore; import org.apache.falcon.metrics.MetricNotificationService; @@ -60,9 +61,9 @@ import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine * Backlog Metric Emitter Service to publish metrics to Graphite. */ public final class BacklogMetricEmitterService implements FalconService, - EntitySLAListener, WorkflowExecutionListener { + EntitySLAListener, WorkflowExecutionListener, ConfigurationChangeListener { - private static final String METRIC_PREFIX = "falcon"; + private static final String METRIC_PREFIX = StartupProperties.get().getProperty("falcon.graphite.prefix"); private static final String METRIC_SEPARATOR = "."; private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs"; private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice." @@ -101,9 +102,63 @@ public final class BacklogMetricEmitterService implements FalconService, private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>(); @Override - public void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime) - throws FalconException { + public void onAdd(Entity entity) throws FalconException{ + //DO Nothing + } + @Override + public void onRemove(Entity entity) throws FalconException{ + if (entity.getEntityType() != EntityType.PROCESS){ + return; + } + backlogMetricStore.deleteEntityInstance(entity.getName()); + entityBacklogs.remove(entity); + Process process = EntityUtil.getEntity(entity.getEntityType(), entity.getName()); + for(Cluster cluster : process.getClusters().getClusters()){ + dropMetric(cluster.getName(), process); + } + } + + public void dropMetric(String clusterName, Process process){ + String pipelinesStr = process.getPipelines(); + String metricName; + + if (pipelinesStr != null && !pipelinesStr.isEmpty()) { + String[] pipelines = pipelinesStr.split(","); + for (String pipeline : pipelines) { + metricName = getMetricName(clusterName, process.getName(), pipeline); + metricNotificationService.deleteMetric(metricName); + } + } else { + metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE); + metricNotificationService.deleteMetric(metricName); + } + } + + @Override + public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{ + if (oldEntity.getEntityType() != EntityType.PROCESS){ + return; + } + Process newProcess = (Process) newEntity; + if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){ + backlogMetricStore.deleteEntityInstance(newProcess.getName()); + entityBacklogs.remove(newProcess); + Process process = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName()); + for(Cluster cluster : process.getClusters().getClusters()){ + dropMetric(cluster.getName(), process); + } + } + } + + @Override + public void onReload(Entity entity) throws FalconException{ + // Do Nothing + } + + @Override + public void highSLAMissed(String entityName, String clusterName, EntityType entityType, + Date nominalTime) throws FalconException { if (entityType != EntityType.PROCESS) { return; } @@ -146,7 +201,7 @@ public final class BacklogMetricEmitterService implements FalconService, List<MetricInfo> metricsInDB = entry.getValue(); List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB); entityBacklogs.put(entry.getKey(), metricInfoList); - LOG.debug("Backlog of entity " + entry.getKey().getName() + " for instances " + metricInfoList); + LOG.debug("Initializing backlog for entity " + entry.getKey().getName()); } } } @@ -172,6 +227,7 @@ public final class BacklogMetricEmitterService implements FalconService, metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName())); if (metrics.isEmpty()) { entityBacklogs.remove(entity); + publishBacklog((Process) entity, context.getClusterName(), 0L); } } } @@ -205,7 +261,7 @@ public final class BacklogMetricEmitterService implements FalconService, @Override public void run() { - LOG.debug("Starting periodic check for backlog"); + LOG.debug("BacklogMetricEmitter running for entities"); executor = new ScheduledThreadPoolExecutor(10); List<Future> futures = new ArrayList<>(); try { @@ -271,31 +327,38 @@ public final class BacklogMetricEmitterService implements FalconService, if (backLogsCluster != null && !backLogsCluster.isEmpty()) { for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) { String clusterName = entry.getKey(); - String pipelinesStr = process.getPipelines(); - String metricName; Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes - if (pipelinesStr != null && !pipelinesStr.isEmpty()) { - String[] pipelines = pipelinesStr.split(","); - for (String pipeline : pipelines) { - metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR - + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name() - + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR - + "backlogInMins"; - metricNotificationService.publish(metricName, backlog); - } - } else { - metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR - + DEFAULT_PIPELINE + METRIC_SEPARATOR + LifeCycle.EXECUTION.name() - + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR - + "backlogInMins"; - metricNotificationService.publish(metricName, backlog); - } + publishBacklog(process, clusterName, backlog); } } } } + public static void publishBacklog(Process process, String clusterName, Long backlog){ + String pipelinesStr = process.getPipelines(); + String metricName; + + if (pipelinesStr != null && !pipelinesStr.isEmpty()) { + String[] pipelines = pipelinesStr.split(","); + for (String pipeline : pipelines) { + metricName = getMetricName(clusterName, process.getName(), pipeline); + metricNotificationService.publish(metricName, backlog); + } + } else { + metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE); + metricNotificationService.publish(metricName, backlog); + } + } + + public static String getMetricName(String clusterName, String processName, String pipeline){ + String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR + + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name() + + METRIC_SEPARATOR + processName + METRIC_SEPARATOR + + "backlogInMins"; + return metricName; + } + /** * Service runs periodically and removes succeeded instances from backlog list. */ @@ -303,7 +366,7 @@ public final class BacklogMetricEmitterService implements FalconService, @Override public void run() { - LOG.debug("BacklogCheckService running for entities"); + LOG.trace("BacklogCheckService running for entities"); try { AbstractWorkflowEngine wfEngine = getWorkflowEngine(); for (Entity entity : entityBacklogs.keySet()) { @@ -331,7 +394,7 @@ public final class BacklogMetricEmitterService implements FalconService, if (status.getInstances().length > 0 && status.getInstances()[0].status == InstancesResult. WorkflowStatus.SUCCEEDED) { - LOG.debug("Instance of nominaltime {} of entity {} was succeeded, removing " + LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing " + "from backlog entries", nominalTimeStr, entity.getName()); backlogMetricStore.deleteMetricInstance(entity.getName(), metricInfo.getCluster(), nominalTime, entity.getEntityType()); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index 09c6695..c4069dd 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -142,7 +142,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList LOG.info("Entity : {} Cluster : {} Nominal Time : {} missed SLALow", entityName, entityType, clusterName, nominalTime); } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){ - if (entityType.equals(EntityType.PROCESS.name())){ + if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())){ store.putSLAAlertInstance(entityName, clusterName, entityType, nominalTime, true, false); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 816846d..7ff9309 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -17,23 +17,22 @@ */ package org.apache.falcon.service; +import com.google.common.annotations.VisibleForTesting; import java.text.ParseException; -import java.util.HashSet; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.Pair; -import org.apache.falcon.entity.FeedInstanceStatus; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.FeedInstanceStatus; import org.apache.falcon.entity.ProcessHelper; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -42,6 +41,7 @@ import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Sla; import org.apache.falcon.entity.v0.process.Clusters; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.jdbc.MonitoringJdbcStateStore; @@ -60,12 +60,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.falcon.entity.v0.process.Process; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import static org.apache.falcon.entity.EntityUtil.getStartTime; +import static org.apache.falcon.util.DateUtil.now; /** * Service to monitor Feed SLAs. @@ -81,6 +80,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList public static final String TAG_CRITICAL = "Missed-SLA-High"; public static final String TAG_WARN = "Missed-SLA-Low"; + private static final long MINUTE_DELAY = 60000L; private EntitySLAMonitoringService() { @@ -128,30 +128,36 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @Override public void onAdd(Entity entity) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); + Set<String> clustersDefined = EntityUtil.getClustersDefined(entity); if (entity.getEntityType() == EntityType.FEED) { Feed feed = (Feed) entity; // currently sla service is enabled only for fileSystemStorage if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) { - for (Cluster cluster : feed.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName())) { + for (String cluster : clustersDefined) { + if (currentClusters.contains(cluster)) { if (FeedHelper.getSLA(cluster, feed) != null) { LOG.debug("Adding feed:{} for monitoring", feed.getName()); - MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString()); - break; + MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString(), + new Date(now().getTime() + MINUTE_DELAY)); + List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster, + getStartTime(entity, cluster), now()); + addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster, + instances); } } } } - } - if (entity.getEntityType() == EntityType.PROCESS){ + } else if (entity.getEntityType() == EntityType.PROCESS) { Process process = (Process) entity; - if (process.getSla() != null || checkProcessClusterSLA(process)){ - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - if (currentClusters.contains(cluster.getName())) { + if (process.getSla() != null || checkProcessClusterSLA(process)) { + for (String cluster : clustersDefined) { + if (currentClusters.contains(cluster)) { LOG.debug("Adding process:{} for monitoring", process.getName()); MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(), - EntityType.PROCESS.toString()); - break; + EntityType.PROCESS.toString(), new Date(now().getTime() + MINUTE_DELAY)); + List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster, + getStartTime(entity, cluster), now()); + addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster, instances); } } } @@ -186,7 +192,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList if (entity.getEntityType() == EntityType.FEED) { Feed feed = (Feed) entity; // currently sla service is enabled only for fileSystemStorage - if (feed.getLocations() != null) { + if (feed.getSla() != null && feed.getLocations() != null) { for (Cluster cluster : feed.getClusters().getClusters()) { if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) { MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString()); @@ -257,9 +263,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList if (newEntity.getEntityType() == EntityType.PROCESS) { Process oldProcess = (Process) oldEntity; Process newProcess = (Process) newEntity; - if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){ - onRemove(newProcess); - } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){ + if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){ + onRemove(oldProcess); + } else if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){ onAdd(newProcess); } else { List<String> slaRemovedClusters = new ArrayList<>(); @@ -283,7 +289,6 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @Override public void onReload(Entity entity) throws FalconException { - onAdd(entity); } @Override @@ -305,6 +310,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList lookAheadWindowMillis = Integer.parseInt(freq); LOG.info("Initializing EntitySLAMonitoringService from ", filePath.toString()); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + addPendingEntityInstances(EntityType.FEED.name(), null, now()); + addPendingEntityInstances(EntityType.PROCESS.name(), null, now()); executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS); } @@ -345,15 +352,14 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList @Override public void run() { try { - if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size() > 0) { + if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities().size() > 0) { checkPendingInstanceAvailability(EntityType.FEED.toString()); checkPendingInstanceAvailability(EntityType.PROCESS.toString()); // add Instances from last checked time to 10 minutes from now(some buffer for status check) - Date now = new Date(); - Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis); - addNewPendingEntityInstances(newCheckPoint, EntityType.FEED.toString()); - addNewPendingEntityInstances(newCheckPoint, EntityType.PROCESS.toString()); + Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis); + addPendingEntityInstances(EntityType.FEED.toString(), null, newCheckPointTime); + addPendingEntityInstances(EntityType.PROCESS.toString(), null, newCheckPointTime); } } catch (Throwable e) { LOG.error("Feed SLA monitoring failed: ", e); @@ -361,55 +367,42 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } } + private void addPendingInstances(String entityType, Entity entity, + String clusterName, + List<Date> instances) throws FalconException { + if (instances != null && !instances.isEmpty()) { + for (Date date : instances) { + LOG.debug("Adding pending instance ={} for entity= {} in cluster>={} and entityType={}", date, + entity.getName(), clusterName, entityType); + MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), clusterName, date, + entityType); + } + } + } - void addNewPendingEntityInstances(Date to, String entityType) throws FalconException { + void addPendingEntityInstances(String entityType, Date startTime, Date endTime) throws FalconException { Set<String> currentClusters = DeploymentUtil.getCurrentClusters(); List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE. - getAllMonitoredEntityForEntity(entityType); + getAllMonitoredEntities(entityType); for(MonitoredEntityBean monitoredEntityBean : entityBeanList) { - String entityName = monitoredEntityBean.getFeedName(); + String entityName = monitoredEntityBean.getEntityName(); + Date lastMonitoredInstanceTime = (startTime != null) ? startTime + : monitoredEntityBean.getLastMonitoredTime(); + Date newCheckPointTime = endTime != null ? endTime : now(); Entity entity = EntityUtil.getEntity(entityType, entityName); - Set<String> clusters = EntityUtil.getClustersDefined(entity); - List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList(); - for(String string : clusters){ - cluster.add(ClusterHelper.getCluster(string)); + Set<String> clustersDefined = EntityUtil.getClustersDefined(entity); + List<org.apache.falcon.entity.v0.cluster.Cluster> clusters = new ArrayList(); + for(String cluster : clustersDefined){ + clusters.add(ClusterHelper.getCluster(cluster)); } - for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : cluster) { + for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : clusters) { if (currentClusters.contains(entityCluster.getName())) { - // get start of instances from the database - Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName, - entityType); - Pair<String, String> key = new Pair<>(entity.getName(), entityCluster.getName()); - if (nextInstanceTime == null) { - nextInstanceTime = getInitialStartTime(entity, entityCluster.getName(), entityType); - } else { - nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); - } - - Set<Date> instances = new HashSet<>(); - org.apache.falcon.entity.v0.cluster.Cluster currentCluster = - EntityUtil.getEntity(EntityType.CLUSTER, entityCluster.getName()); - nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); - LOG.trace("nextInstanceTime:"+ nextInstanceTime + "entityName:"+entityName); - Date endDate; - if (entityType.equals(EntityType.FEED.toString())){ - endDate = FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd(); - }else { - endDate = ProcessHelper.getClusterValidity((Process) entity, - currentCluster.getName()).getEnd(); - } - while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) { - LOG.trace("Adding pending instance={} for <entity,cluster>={}; entityType={}", - nextInstanceTime, key, entityType); - instances.add(nextInstanceTime); - nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS); - nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime); - } - - for(Date date:instances){ - MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), entityCluster.getName(), date, - entityType); - } + List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, entityCluster.getName(), + lastMonitoredInstanceTime, newCheckPointTime); + addPendingInstances(entityType, entity, entityCluster.getName(), instances); + // update last monitored time with the new checkpoint time + MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType, + new Date(newCheckPointTime.getTime() + MINUTE_DELAY)); } } } @@ -421,17 +414,17 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList */ private void checkPendingInstanceAvailability(String entityType) throws FalconException { if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){ - LOG.info("Returning as size of pending instance is zero"); + LOG.info("No pending instances to be checked"); return; } for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ - for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), + for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(), entityType)) { boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), date, entityType); + pendingInstanceBean.getClusterName(), instanceTime, entityType); if (status) { MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), date, EntityType.FEED.toString()); + pendingInstanceBean.getClusterName(), instanceTime, EntityType.FEED.toString()); } } } @@ -441,7 +434,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime, String entityType) throws FalconException { Entity entity = EntityUtil.getEntity(entityType, entityName); - authenticateUser(entity); + authenticateUser(); try { if (entityType.equals(EntityType.PROCESS.toString())){ LOG.trace("Checking instance availability status for entity:{}, cluster:{}, " @@ -479,8 +472,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList /** - * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} and {@link org.apache.falcon.entity.v0.process.Process} - * instances between given time range which have missed slaLow or slaHigh. + * Returns all the instances between given time range which have missed slaLow or slaHigh for given entity. * * Only entities which have defined sla in their definition are considered. * Only the entity instances between the given time range are considered. @@ -534,13 +526,13 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } /** - * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range + * Returns all the instances of a given entity between the given time range * which missed sla.Only those instances are included which have missed either slaLow or slaHigh. * @param entityName name of the feed * @param clusterName cluster name * @param start start time, inclusive * @param end end time, inclusive - * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA. + * @return Pending instances of the given entity which belong to the given time range and have missed SLA. * @throws FalconException */ public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName, @@ -634,7 +626,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName); Frequency slaLow = sla.getSlaLow(); - Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); + Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); return startTime.before(slaTime) ? startTime : slaTime; } else{ org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity); @@ -644,7 +636,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName); Frequency slaLow = sla.getShouldEndIn(); - Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); + Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow)); return startTime.before(slaTime) ? startTime : slaTime; } } @@ -666,10 +658,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList } } - private void authenticateUser(Entity entity){ - if (StringUtils.isNotBlank(entity.getACL().getOwner())) { - CurrentUser.authenticate(entity.getACL().getOwner()); - } else { + // Authenticate user only if not already authenticated. + private void authenticateUser(){ + if (!CurrentUser.isAuthenticated()) { CurrentUser.authenticate(System.getProperty("user.name")); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index 018c562..a64b654 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -39,6 +39,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.falcon.util.DateUtil.now; + /** *Unit test for MonitoringJdbcStateStore. * */ @@ -86,11 +88,11 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { @Test public void testInsertRetrieveAndUpdate() throws Exception { - monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString()); - monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString()); + monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString(), now()); + monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString(), now()); Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1", - EntityType.FEED.toString()).getFeedName()); - Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntity().size(), 2); + EntityType.FEED.toString()).getEntityName()); + Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntities().size(), 2); monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString()); monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString()); @@ -109,10 +111,27 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { } @Test + public void testUpdateAndGetLastMonitoredTime() throws Exception { + Date expectedLastMonitoredTime = now(); + monitoringJdbcStateStore.putMonitoredEntity("test-process", EntityType.PROCESS.toString(), + expectedLastMonitoredTime); + Date actualLastMonitoredTime = monitoringJdbcStateStore.getMonitoredEntity("test-process", + EntityType.PROCESS.toString()).getLastMonitoredTime(); + Assert.assertEquals(actualLastMonitoredTime, expectedLastMonitoredTime); + + Date updatedLastMonitoredTime = new Date(now().getTime() + 600000L); + monitoringJdbcStateStore.updateLastMonitoredTime("test-process", EntityType.PROCESS.toString(), + updatedLastMonitoredTime); + actualLastMonitoredTime = monitoringJdbcStateStore.getMonitoredEntity("test-process", + EntityType.PROCESS.toString()).getLastMonitoredTime(); + Assert.assertEquals(actualLastMonitoredTime, updatedLastMonitoredTime); + } + + @Test public void testEmptyLatestInstance() throws Exception { MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); - store.putMonitoredEntity("test-feed1", EntityType.FEED.toString()); - store.putMonitoredEntity("test-feed2", EntityType.FEED.toString()); + store.putMonitoredEntity("test-feed1", EntityType.FEED.toString(), now()); + store.putMonitoredEntity("test-feed2", EntityType.FEED.toString(), now()); Assert.assertNull(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString())); Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java index 8b51354..347e39a 100644 --- a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java +++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java @@ -45,7 +45,7 @@ import java.io.File; import java.util.Date; /** - * Test for EntitySLAMonitoringService. + * Test for SLA Alerts. */ public class EntitySLAAlertServiceTest extends AbstractTestBase { private static final String DB_BASE_DIR = "target/test-data/persistancedb"; http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 6d82516..8eb58b9 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -107,6 +107,10 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.FeedLocationStore,\ org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService + +## if you wish to use BacklogEmitterService please add BackLogEmitter service as a configstore listners.## +# org.apache.falcon.service.BacklogMetricEmitterService + ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## # org.apache.falcon.state.store.jdbc.JDBCStateStore @@ -124,6 +128,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler #org.apache.falcon.service.LogMoverService +#org.apache.falcon.service.BacklogMetricEmitterService ######### Implementation classes ######### @@ -281,6 +286,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Authorization Enabled flag: false (default)|true *.falcon.security.authorization.enabled=false +# The name of the group of super-users +*.falcon.security.authorization.superusergroup=falcon + # CSRF filter enabled flag: false (default) | true *.falcon.security.csrf.enabled=false @@ -290,9 +298,6 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # Browser user agents to be filtered *.falcon.security.csrf.browser=^Mozilla.*,^Opera.* -# The name of the group of super-users -*.falcon.security.authorization.superusergroup=falcon - # Admin Users, comma separated users *.falcon.security.authorization.admin.users=falcon,ambari-qa @@ -368,5 +373,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.postprocessing.enable=true ### LogMoveService Properties -*.falcon.logMoveService.threadCount=200 +*.falcon.logMoveService.max.threadCount=200 *.falcon.logMoveService.blockingQueue.length=50 +##Note min threadCount should always be smaller than max threadCount. +*.falcon.logMoveService.min.threadCount=20
