Repository: falcon Updated Branches: refs/heads/master e8d1de35d -> b568a050a
FALCON-2042 Persist Feed SLA misses in DB Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #194 from PraveenAdlakha/1493 and squashes the following commits: 32c4109 [Praveen Adlakha] comments addressed fe3616e [Praveen Adlakha] Test cases fixed eb25312 [Praveen Adlakha] Pallavi's comment addressed abee43f [Praveen Adlakha] WIP 2acb958 [Praveen Adlakha] FALCON-2042 Persist Feed SLA misses in DB b4d4310 [Praveen Adlakha] WIP 92b785b [Praveen Adlakha] Listerners added be97853 [Praveen Adlakha] WIP 70ba220 [Praveen Adlakha] WIP 37161a3 [Praveen Adlakha] WIP 7f50bf4 [Praveen Adlakha] WIP need to add created date 762482d [Praveen Adlakha] WIP Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b568a050 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b568a050 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b568a050 Branch: refs/heads/master Commit: b568a050a78c90b7dfc01231f3bed913e0d27f7b Parents: e8d1de3 Author: Praveen Adlakha <[email protected]> Authored: Mon Jun 27 14:38:04 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jun 27 14:38:04 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/entity/FeedHelper.java | 7 + .../falcon/persistence/FeedSLAAlertBean.java | 134 +++++++++++++++ .../falcon/persistence/PendingInstanceBean.java | 3 +- .../persistence/PersistenceConstants.java | 7 + .../falcon/tools/FalconStateStoreDBCLI.java | 1 + .../java/org/apache/falcon/util/DateUtil.java | 2 +- .../src/main/resources/META-INF/persistence.xml | 10 +- common/src/main/resources/startup.properties | 3 + .../falcon/jdbc/MonitoringJdbcStateStore.java | 100 +++++++++++- .../falcon/service/EntitySLAListener.java | 29 ++++ .../falcon/service/FeedSLAAlertService.java | 162 +++++++++++++++++++ .../service/FeedSLAMonitoringService.java | 9 +- .../jdbc/MonitoringJdbcStateStoreTest.java | 28 ++++ .../falcon/service/FeedSLAAlertServiceTest.java | 162 +++++++++++++++++++ .../falcon/service/FeedSLAMonitoringTest.java | 9 +- src/build/findbugs-exclude.xml | 13 +- 16 files changed, 655 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index ee6837e..ea34d34 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -1287,4 +1287,11 @@ public final class FeedHelper { } return null; } + + public static List<FeedInstanceStatus> getListing(Feed feed, String clusterName, LocationType locationType, + Date start, Date end) throws FalconException{ + Storage storage= createStorage(clusterName, feed); + return storage.getListing(feed, clusterName, locationType, start, end); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java new file mode 100644 index 0000000..4ea3454 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + +import java.util.Date; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Feed SLA monitoring. + * */ +@Entity +@NamedQueries({ +@NamedQuery(name = PersistenceConstants.GET_FEED_ALERTS, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName"), +@NamedQuery(name = PersistenceConstants.GET_ALL_FEED_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "), +@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from FeedSLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "), + @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update FeedSLAAlertBean a set a.isSLAHighMissed = true where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), +@NamedQuery(name = PersistenceConstants.GET_FEED_ALERT_INSTANCE, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime "), + @NamedQuery(name = PersistenceConstants.DELETE_FEED_ALERT_INSTANCE, query = "delete from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime") +}) +@Table(name = "FEED_SLA_ALERTS") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class FeedSLAAlertBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "feed_name") + private String feedName; + + @Basic + @NotNull + @Column(name = "cluster_name") + private String clusterName; + + @Basic + @NotNull + @Column(name = "nominal_time") + private Date nominalTime; + + @Basic + @Column(name = "sla_low_missed") + private Boolean isSLALowMissed = false; + + @Basic + @Column(name = "sla_high_missed") + private Boolean isSLAHighMissed = false; + + @Basic + @Column(name = "sla_low_alert_sent") + private Boolean slaLowAlertSent; + + + @Basic + @Column(name = "sla_high_alert_sent") + private Boolean slaHighAlertSent; + + public Date getNominalTime() { + return new Date(nominalTime.getTime()); + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = new Date(nominalTime.getTime()); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getFeedName() { + return feedName; + } + + public void setFeedName(String feedName) { + this.feedName = feedName; + } + + public Boolean getIsSLALowMissed() { + return isSLALowMissed; + } + + public void setIsSLALowMissed(Boolean isSLALowMissed) { + this.isSLALowMissed = isSLALowMissed; + } + + public Boolean getIsSLAHighMissed() { + return isSLAHighMissed; + } + + public void setIsSLAHighMissed(Boolean isSLAHighMissed) { + this.isSLAHighMissed = isSLAHighMissed; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 108001d..41eb048 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -40,7 +40,8 @@ import java.util.Date; @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ") + @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a "), + @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime") }) @Table(name = "PENDING_INSTANCES") //RESUME CHECKSTYLE CHECK LineLengthCheck http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 44edc7c..72c382e 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -28,6 +28,7 @@ public final class PersistenceConstants { public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES"; public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS"; public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; + public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE"; public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; @@ -54,4 +55,10 @@ public final class PersistenceConstants { public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE"; public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; + public static final String GET_FEED_ALERTS = "GET_FEED_ALERTS"; + public static final String GET_ALL_FEED_ALERTS = "GET_ALL_FEED_ALERTS"; + public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; + public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; + public static final String GET_FEED_ALERT_INSTANCE = "GET_FEED_ALERT_INSTANCE"; + public static final String DELETE_FEED_ALERT_INSTANCE = "DELETE_FEED_ALERT_INSTANCE"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index d56087a..1bdfc25 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -245,6 +245,7 @@ public class FalconStateStoreDBCLI { args.add("org.apache.falcon.persistence.InstanceBean"); args.add("org.apache.falcon.persistence.PendingInstanceBean"); args.add("org.apache.falcon.persistence.MonitoredFeedsBean"); + args.add("org.apache.falcon.persistence.FeedSLAAlertBean"); return args.toArray(new String[args.size()]); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index 9e9b8e8..2a2f3d4 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -78,7 +78,7 @@ public final class DateUtil { activeTimeMask = (entityInUTC) ? ISO8601_UTC_MASK : ISO8601_TZ_MASK_WITHOUT_OFFSET + tz.substring(3); } - public static Date getNextMinute(Date time) throws Exception { + public static Date getNextMinute(Date time) { Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); insCal.setTime(time); insCal.add(Calendar.MINUTE, 1); http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index 4c9388c..c9b444d 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -27,6 +27,7 @@ <class>org.apache.falcon.persistence.InstanceBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -36,7 +37,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -47,6 +48,7 @@ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> <property name="openjpa.Log" value="log4j"/> + <property name="openjpa.DynamicEnhancementAgent" value="true"/> </properties> </persistence-unit> @@ -57,6 +59,7 @@ <class>org.apache.falcon.persistence.InstanceBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -66,7 +69,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -87,6 +90,7 @@ <class>org.apache.falcon.persistence.InstanceBean</class> <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> + <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -96,7 +100,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean)"></property> + org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 0990035..374ff17 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -72,6 +72,9 @@ ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## # org.apache.falcon.state.store.jdbc.JdbcStateStore +## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here +*.feedAlert.listeners= + ##### JMS MQ Broker Implementation class ##### *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 9c9efa0..4fd1b53 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -19,13 +19,15 @@ package org.apache.falcon.jdbc; import org.apache.commons.collections.CollectionUtils; import org.apache.falcon.persistence.MonitoredFeedsBean; -import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.persistence.FeedSLAAlertBean; import org.apache.falcon.persistence.PersistenceConstants; +import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.persistence.ResultNotFoundException; import org.apache.falcon.service.FalconJPAService; import javax.persistence.EntityManager; import javax.persistence.Query; +import javax.persistence.TypedQuery; import java.util.Date; import java.util.List; @@ -80,7 +82,7 @@ public class MonitoringJdbcStateStore { } } - public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{ + public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); List result = q.getResultList(); @@ -166,6 +168,100 @@ public class MonitoringJdbcStateStore { entityManager.close(); } + public PendingInstanceBean getPendingInstance(String feedName, String clusterName, Date nominalTime) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE, + PendingInstanceBean.class); + q.setParameter("feedName", feedName); + + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime", nominalTime); + try { + return q.getSingleResult(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public FeedSLAAlertBean getFeedAlertInstance(String feedName, String clusterName, Date nominalTime) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + TypedQuery<FeedSLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_FEED_ALERT_INSTANCE, + FeedSLAAlertBean.class); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime", nominalTime); + try { + return q.getSingleResult(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void putSLAAlertInstance(String feedName, String cluster, Date nominalTime, Boolean isSLALowMissed, + Boolean isSLAHighMissed) { + EntityManager entityManager = getEntityManager(); + FeedSLAAlertBean feedSLAAlertBean = new FeedSLAAlertBean(); + feedSLAAlertBean.setFeedName(feedName); + feedSLAAlertBean.setClusterName(cluster); + feedSLAAlertBean.setNominalTime(nominalTime); + feedSLAAlertBean.setIsSLALowMissed(isSLALowMissed); + feedSLAAlertBean.setIsSLAHighMissed(isSLAHighMissed); + try { + beginTransaction(entityManager); + entityManager.persist(feedSLAAlertBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void updateSLAAlertInstance(String feedName, String clusterName, Date nominalTime) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime", nominalTime); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public void deleteFeedAlertInstance(String feedName, String clusterName, Date nominalTime){ + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_FEED_ALERT_INSTANCE); + q.setParameter("feedName", feedName); + q.setParameter("clusterName", clusterName); + q.setParameter("nominalTime", nominalTime); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + + public List<FeedSLAAlertBean> getSLAHighCandidates() { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES); + List result = q.getResultList(); + + try { + if (CollectionUtils.isEmpty(result)) { + return null; + } + } finally{ + entityManager.close(); + } + return result; + } + + private void beginTransaction(EntityManager entityManager) { entityManager.getTransaction().begin(); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java new file mode 100644 index 0000000..c0ab257 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; + +import java.util.Date; + +/** + * Interface for FeedSLAAlert to be used by Listeners. + */ +public interface EntitySLAListener { + void highSLAMissed(String feedName , String clusterName, Date nominalTime) throws FalconException; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java new file mode 100644 index 0000000..4f6dab7 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + + +import java.util.Date; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.ArrayList; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.FeedHelper; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.util.StartupProperties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to know which all feeds have missed SLA. + */ +public final class FeedSLAAlertService implements FalconService, EntitySLAListener { + + private static final String NAME = "FeedSLAAlertService"; + + private static final Logger LOG = LoggerFactory.getLogger(FeedSLAAlertService.class); + + private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + + private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>(); + + private static final FeedSLAAlertService SERVICE = new FeedSLAAlertService(); + + public static FeedSLAAlertService get() { + return SERVICE; + } + + private FeedSLAAlertService(){} + + + @Override + public String getName() { + return NAME; + } + + @Override + public void init() throws FalconException { + String listenerClassNames = StartupProperties.get(). + getProperty("feedAlert.listeners"); + for (String listenerClassName : listenerClassNames.split(",")) { + listenerClassName = listenerClassName.trim(); + if (listenerClassName.isEmpty()) { + continue; + } + EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); + registerListener(listener); + } + + String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + int statusCheckFrequencySeconds = Integer.parseInt(freq); + + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS); + } + + public void registerListener(EntitySLAListener listener) { + listeners.add(listener); + } + + @Override + public void destroy() throws FalconException { + + } + + + private class Monitor implements Runnable { + + @Override + public void run() { + processSLACandidates(); + } + } + + void processSLACandidates(){ + //Get all feeds instances to be monitored + List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances(); + if (pendingInstanceBeanList.isEmpty()){ + return; + } + + LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); + try{ + for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { + + String feedName = pendingInstanceBean.getFeedName(); + String clusterName = pendingInstanceBean.getClusterName(); + Date nominalTime = pendingInstanceBean.getNominalTime(); + Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName); + + Cluster cluster = FeedHelper.getCluster(feed, clusterName); + + Set<SchedulableEntityInstance> schedulableEntityInstances= FeedSLAMonitoringService.get(). + getFeedSLAMissPendingAlerts(feed.getName(), cluster.getName(), nominalTime, nominalTime); + if (schedulableEntityInstances.isEmpty()){ + store.deleteFeedAlertInstance(feed.getName(), cluster.getName(), nominalTime); + return; + } + List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances); + SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); + + + if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_WARN)) { + store.putSLAAlertInstance(feedName, clusterName, nominalTime, true, false); + //Mark in DB as SLA missed + LOG.info("Feed :"+ feedName + + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow"); + } else if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_CRITICAL)){ + store.updateSLAAlertInstance(feedName, clusterName, nominalTime); + LOG.info("Feed :"+ feedName + + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLAHigh"); + highSLAMissed(feedName, clusterName, nominalTime); + } + } + } catch (FalconException e){ + LOG.error("Exception in FeedSLAALertService:", e); + } + + } + + @Override + public void highSLAMissed(String feedName , String clusterName, Date nominalTime) throws FalconException{ + for (EntitySLAListener listener : listeners) { + listener.highSLAMissed(feedName, clusterName, nominalTime); + } + store.deleteFeedAlertInstance(feedName, clusterName, nominalTime); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java index 9de4463..ed7bb08 100644 --- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java @@ -68,6 +68,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService(); + public static final String TAG_CRITICAL = "Missed-SLA-High"; + public static final String TAG_WARN = "Missed-SLA-Low"; + private FeedSLAMonitoringService() { } @@ -410,8 +413,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances) throws FalconException { - String tagCritical = "Missed SLA High"; - String tagWarn = "Missed SLA Low"; Date now = new Date(); Frequency slaLow = sla.getSlaLow(); Frequency slaHigh = sla.getSlaHigh(); @@ -425,9 +426,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration); Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration); if (slaCriticalTime.before(now)) { - result.add(new Pair<>(nominalTime, tagCritical)); + result.add(new Pair<>(nominalTime, TAG_CRITICAL)); } else if (slaWarnTime.before(now)) { - result.add(new Pair<>(nominalTime, tagWarn)); + result.add(new Pair<>(nominalTime, TAG_WARN)); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index b43025d..a4a95be 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -122,6 +122,31 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { } + @Test + public void testputSLALowCandidate() throws Exception{ + MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE); + Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", + "test-cluster", dateOne).getIsSLALowMissed()); + Assert.assertTrue(dateOne.equals(store.getFeedAlertInstance("test-feed1", + "test-cluster", dateOne).getNominalTime())); + store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne); + Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", + "test-cluster", dateOne).getIsSLAHighMissed()); + } + + @Test + public void testupdateSLAHighCandidate() throws Exception{ + MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z"); + + store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE); + store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne); + Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1", + "test-cluster", dateOne).getIsSLAHighMissed()); + } + private void clear() { EntityManager em = FalconJPAService.get().getEntityManager(); em.getTransaction().begin(); @@ -130,6 +155,9 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { query.executeUpdate(); query = em.createNativeQuery("delete from PENDING_INSTANCES"); query.executeUpdate(); + query = em.createNativeQuery("delete from FEED_SLA_ALERTS"); + query.executeUpdate(); + } finally { em.getTransaction().commit(); em.close(); http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java new file mode 100644 index 0000000..7c886c1 --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.feed.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.feed.Sla; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.io.File; +import java.util.Date; + +/** + * Test for FeedSLAMonitoringService. + */ +public class FeedSLAAlertServiceTest extends AbstractTestBase { + private static final String DB_BASE_DIR = "target/test-data/persistancedb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + private static MonitoringJdbcStateStore monitoringJdbcStateStore; + private static FalconJPAService falconJPAService = FalconJPAService.get(); + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } + + public void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + @BeforeClass + public void setup() throws Exception{ + StateStoreProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + monitoringJdbcStateStore = new MonitoringJdbcStateStore(); + } + + @BeforeMethod + public void init() { + clear(); + } + + private void clear() { + EntityManager em = FalconJPAService.get().getEntityManager(); + em.getTransaction().begin(); + try { + Query query = em.createNativeQuery("delete from MONITORED_FEEDS"); + query.executeUpdate(); + query = em.createNativeQuery("delete from PENDING_INSTANCES"); + query.executeUpdate(); + query = em.createNativeQuery("delete from FEED_SLA_ALERTS"); + query.executeUpdate(); + + } finally { + em.getTransaction().commit(); + em.close(); + } + } + + @Test + public static void processSLALowCandidates() throws FalconException, InterruptedException{ + + Date dateOne = new Date(System.currentTimeMillis()-100000); + monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne); + org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); + Cluster testCluster = new Cluster(); + testCluster.setName("test-cluster"); + cluster.getClusters().add(testCluster); + Feed mockEntity = new Feed(); + mockEntity.setName("test-feed"); + mockEntity.setClusters(cluster); + if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { + ConfigurationStore.get().publish(EntityType.FEED, mockEntity); + } + Sla sla = new Sla(); + Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); + Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); + sla.setSlaLow(frequencyLow); + sla.setSlaHigh(frequencyHigh); + mockEntity.setSla(sla); + + FeedSLAAlertService.get().init(); + Thread.sleep(10*1000); + Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster", + dateOne).getIsSLALowMissed()); + } + + @Test(expectedExceptions = javax.persistence.NoResultException.class) + public static void processSLAHighCandidates() throws FalconException, InterruptedException{ + + Date dateOne = new Date(System.currentTimeMillis()-130000); + monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne); + org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters(); + Cluster testCluster = new Cluster(); + testCluster.setName("test-cluster"); + cluster.getClusters().add(testCluster); + Feed mockEntity = new Feed(); + mockEntity.setName("test-feed"); + mockEntity.setClusters(cluster); + if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) { + ConfigurationStore.get().publish(EntityType.FEED, mockEntity); + } + Sla sla = new Sla(); + Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes); + Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes); + sla.setSlaLow(frequencyLow); + sla.setSlaHigh(frequencyHigh); + mockEntity.setSla(sla); + + FeedSLAAlertService.get().init(); + Thread.sleep(10*1000); + Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster", + dateOne).getIsSLAHighMissed()); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java index dbe0cf4..97cc459 100644 --- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java +++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java @@ -54,6 +54,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { private static final String CLUSTER_NAME = "testCluster"; private static final String FEED_NAME = "testFeed"; private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private static final String TAG_CRITICAL = FeedSLAMonitoringService.get().TAG_CRITICAL; @Test public void testSLAStatus() throws FalconException { @@ -75,10 +76,10 @@ public class FeedSLAMonitoringTest extends AbstractTestBase { Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances); Set<Pair<Date, String>> expected = new HashSet<>(); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), "Missed SLA High")); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), "Missed SLA High")); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), "Missed SLA High")); - expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), "Missed SLA High")); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), TAG_CRITICAL)); + expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), TAG_CRITICAL)); Assert.assertEquals(result, expected); } http://git-wip-us.apache.org/repos/asf/falcon/blob/b568a050/src/build/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index a6766df..62720e3 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -52,10 +52,10 @@ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> </Match> - <!--<Match>--> - <!--<Class name="org.apache.falcon.persistence.PendingInstanceBean" />--> - <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />--> - <!--</Match>--> + <Match> + <Class name="org.apache.falcon.persistence.FeedSLAAlertBean" /> + <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" /> + </Match> <Match> @@ -63,11 +63,6 @@ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> </Match> - <!--<Match>--> - <!--<Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />--> - <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />--> - <!--</Match>--> - <Match> <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" /> </Match>
