FALCON-2052 Process SLA monitoring Author: Praveen Adlakha <[email protected]>
Reviewers: Ajay Yadava <[email protected]>, Pallavi Rao Closes #202 from PraveenAdlakha/2052 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/60e2f68b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/60e2f68b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/60e2f68b Branch: refs/heads/master Commit: 60e2f68b867476f600ba43dc4dafb97971a78b2e Parents: bd32b61 Author: Praveen Adlakha <[email protected]> Authored: Wed Jul 6 08:51:03 2016 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Jul 6 08:51:03 2016 +0530 ---------------------------------------------------------------------- client/src/main/resources/process-0.1.xsd | 4 +- .../org/apache/falcon/entity/ProcessHelper.java | 36 ++ .../falcon/persistence/EntitySLAAlertBean.java | 168 +++++ .../falcon/persistence/FeedSLAAlertBean.java | 134 ---- .../falcon/persistence/MonitoredEntityBean.java | 103 +++ .../falcon/persistence/MonitoredFeedsBean.java | 73 --- .../falcon/persistence/PendingInstanceBean.java | 58 +- .../persistence/PersistenceConstants.java | 10 +- .../falcon/tools/FalconStateStoreDBCLI.java | 4 +- .../src/main/resources/META-INF/persistence.xml | 18 +- common/src/main/resources/startup.properties | 4 +- .../apache/falcon/entity/AbstractTestBase.java | 2 +- .../entity/store/FeedLocationStoreTest.java | 2 +- docs/src/site/twiki/FalconNativeScheduler.twiki | 2 +- docs/src/site/twiki/FeedSLAMonitoring.twiki | 2 +- .../falcon/handler/SLAMonitoringHandler.java | 13 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 134 ++-- .../AbstractSchedulableEntityManager.java | 8 +- .../proxy/SchedulableEntityManagerProxy.java | 2 +- .../falcon/service/EntitySLAAlertService.java | 168 +++++ .../falcon/service/EntitySLAListener.java | 3 +- .../service/EntitySLAMonitoringService.java | 644 +++++++++++++++++++ .../falcon/service/FeedSLAAlertService.java | 163 ----- .../service/FeedSLAMonitoringService.java | 450 ------------- .../jdbc/MonitoringJdbcStateStoreTest.java | 77 ++- .../service/EntitySLAAlertServiceTest.java | 213 ++++++ .../falcon/service/FeedSLAAlertServiceTest.java | 162 ----- .../falcon/service/FeedSLAMonitoringTest.java | 5 +- src/build/findbugs-exclude.xml | 4 +- src/conf/startup.properties | 6 +- 30 files changed, 1544 insertions(+), 1128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/client/src/main/resources/process-0.1.xsd ---------------------------------------------------------------------- diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd index 0d01e33..7ed8474 100644 --- a/client/src/main/resources/process-0.1.xsd +++ b/client/src/main/resources/process-0.1.xsd @@ -204,8 +204,7 @@ </xs:documentation> </xs:annotation> <xs:sequence> - <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1"> - </xs:element> + <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1"/> </xs:sequence> </xs:complexType> @@ -218,6 +217,7 @@ </xs:annotation> <xs:sequence> <xs:element type="validity" name="validity"/> + <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/> </xs:sequence> <xs:attribute type="IDENTIFIER" name="name" use="required"/> <xs:attribute type="xs:int" name="version" use="optional" default="0"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java index bbfca68..e563d18 100644 --- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java @@ -26,6 +26,8 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Validity; +import org.apache.falcon.entity.v0.process.Sla; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.expression.ExpressionHelper; import org.apache.falcon.resource.SchedulableEntityInstance; @@ -185,4 +187,38 @@ public final class ProcessHelper { } return result; } + + public static Validity getClusterValidity(Process process, String clusterName) throws FalconException { + org.apache.falcon.entity.v0.process.Cluster cluster = getCluster(process, clusterName); + if (cluster == null) { + throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName()); + } + return cluster.getValidity(); + } + + public static Sla getSLA(String clusterName, Process process) throws FalconException{ + Cluster cluster = getCluster(process, clusterName); + if (cluster == null){ + throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName()); + } + return getSLA(cluster, process); + } + + public static Sla getSLA(Cluster cluster, Process process) { + final Sla clusterSla = cluster.getSla(); + if (clusterSla != null) { + return clusterSla; + } + return process.getSla(); + } + + public static Date getProcessValidityStart(Process process, String clusterName) throws FalconException { + Cluster processCluster = getCluster(process, clusterName); + if (processCluster != null) { + return processCluster.getValidity().getStart(); + } else { + throw new FalconException("No matching cluster " + clusterName + + "found for process " + process.getName()); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java new file mode 100644 index 0000000..e2096fe --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.persistence; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityType; + +import java.util.Date; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Feed SLA monitoring. + * */ +@Entity +@NamedQueries({ +@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERTS, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.entityType = :entityType"), +@NamedQuery(name = PersistenceConstants.GET_ALL_ENTITY_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "), +@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from EntitySLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "), + @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update EntitySLAAlertBean a set a.isSLAHighMissed = true where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), +@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERT_INSTANCE, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE, query = "delete from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType") +}) +@Table(name = "ENTITY_SLA_ALERTS") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class EntitySLAAlertBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "entity_name") + private String entityName; + + @Basic + @NotNull + @Column(name = "cluster_name") + private String clusterName; + + public String getEntityType() { + return entityType; + } + + public void setEntityType(String entityType) throws FalconException { + checkEntityType(entityType); + this.entityType = entityType; + } + + @Basic + @NotNull + @Column(name = "entity_type") + private String entityType; + + @Basic + @NotNull + @Column(name = "nominal_time") + private Date nominalTime; + + @Basic + @Column(name = "sla_low_missed") + private Boolean isSLALowMissed = false; + + @Basic + @Column(name = "sla_high_missed") + private Boolean isSLAHighMissed = false; + + @Basic + @Column(name = "sla_low_alert_sent") + private Boolean slaLowAlertSent; + + + @Basic + @Column(name = "sla_high_alert_sent") + private Boolean slaHighAlertSent; + + public Date getNominalTime() { + return new Date(nominalTime.getTime()); + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = new Date(nominalTime.getTime()); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getEntityName() { + return entityName; + } + + public void setEntityName(String entityName) { + this.entityName = entityName; + } + + public Boolean getIsSLALowMissed() { + return isSLALowMissed; + } + + public void setIsSLALowMissed(Boolean isSLALowMissed) { + this.isSLALowMissed = isSLALowMissed; + } + + public Boolean getIsSLAHighMissed() { + return isSLAHighMissed; + } + + public void setIsSLAHighMissed(Boolean isSLAHighMissed) { + this.isSLAHighMissed = isSLAHighMissed; + } + + public static final String ENTITYNAME = "entityName"; + + public static final String CLUSTERNAME = "clusterName"; + + public static final String ENTITYTYPE = "entityType"; + + public static final String NOMINALTIME = "nominalTime"; + + void checkEntityType(String entityType)throws FalconException{ + if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ + return; + } else { + throw new FalconException("EntityType"+ entityType + + " is not valid,Feed and Process are the valid input type."); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java deleted file mode 100644 index 4ea3454..0000000 --- a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.persistence; - -import java.util.Date; - -import javax.persistence.Basic; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.Table; -import javax.validation.constraints.NotNull; - -//SUSPEND CHECKSTYLE CHECK LineLengthCheck -/** - * Feed SLA monitoring. - * */ -@Entity -@NamedQueries({ -@NamedQuery(name = PersistenceConstants.GET_FEED_ALERTS, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName"), -@NamedQuery(name = PersistenceConstants.GET_ALL_FEED_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "), -@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from FeedSLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "), - @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update FeedSLAAlertBean a set a.isSLAHighMissed = true where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), -@NamedQuery(name = PersistenceConstants.GET_FEED_ALERT_INSTANCE, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime "), - @NamedQuery(name = PersistenceConstants.DELETE_FEED_ALERT_INSTANCE, query = "delete from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime") -}) -@Table(name = "FEED_SLA_ALERTS") -//RESUME CHECKSTYLE CHECK LineLengthCheck -public class FeedSLAAlertBean { - @NotNull - @GeneratedValue(strategy = GenerationType.AUTO) - @Id - private String id; - - @Basic - @NotNull - @Column(name = "feed_name") - private String feedName; - - @Basic - @NotNull - @Column(name = "cluster_name") - private String clusterName; - - @Basic - @NotNull - @Column(name = "nominal_time") - private Date nominalTime; - - @Basic - @Column(name = "sla_low_missed") - private Boolean isSLALowMissed = false; - - @Basic - @Column(name = "sla_high_missed") - private Boolean isSLAHighMissed = false; - - @Basic - @Column(name = "sla_low_alert_sent") - private Boolean slaLowAlertSent; - - - @Basic - @Column(name = "sla_high_alert_sent") - private Boolean slaHighAlertSent; - - public Date getNominalTime() { - return new Date(nominalTime.getTime()); - } - - public void setNominalTime(Date nominalTime) { - this.nominalTime = new Date(nominalTime.getTime()); - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getClusterName() { - return clusterName; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - public String getFeedName() { - return feedName; - } - - public void setFeedName(String feedName) { - this.feedName = feedName; - } - - public Boolean getIsSLALowMissed() { - return isSLALowMissed; - } - - public void setIsSLALowMissed(Boolean isSLALowMissed) { - this.isSLALowMissed = isSLALowMissed; - } - - public Boolean getIsSLAHighMissed() { - return isSLAHighMissed; - } - - public void setIsSLAHighMissed(Boolean isSLAHighMissed) { - this.isSLAHighMissed = isSLAHighMissed; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java new file mode 100644 index 0000000..20ce537 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityType; + +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Column; +import javax.persistence.Basic; +import javax.validation.constraints.NotNull; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** +* The Feeds that are to be monitered will be stored in the db. +* */ + +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from " + + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean " + + "a where a.entityName = :entityName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) " + + "from MonitoredEntityBean a") +}) +@Table(name="MONITORED_ENTITY") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class MonitoredEntityBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Column(name = "entity_name") + private String entityName; + + public String getEntityType() { + return entityType; + } + + public void setEntityType(String entityType) throws FalconException { + checkEntityType(entityType); + this.entityType = entityType; + } + + @Basic + @NotNull + @Column(name = "entity_type") + private String entityType; + + public String getFeedName() { + return entityName; + } + + public void setEntityName(String feedName) { + this.entityName = feedName; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public static final String ENTITYNAME = "entityName"; + + public static final String ENTITYTYPE = "entityType"; + + void checkEntityType(String entityType)throws FalconException { + if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ + return; + } else { + throw new FalconException("EntityType"+ entityType + + " is not valid,Feed and Process are the valid input type."); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java deleted file mode 100644 index 2b48569..0000000 --- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.persistence; - -import javax.persistence.Entity; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.Table; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; -import javax.persistence.Id; -import javax.persistence.Column; -import javax.persistence.Basic; -import javax.validation.constraints.NotNull; - -//SUSPEND CHECKSTYLE CHECK LineLengthCheck -/** -* The Feeds that are to be monitered will be stored in the db. -* */ - -@Entity -@NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from " - + "MonitoredFeedsBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean " - + "a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) " - + "from MonitoredFeedsBean a") -}) -@Table(name="MONITORED_FEEDS") -//RESUME CHECKSTYLE CHECK LineLengthCheck -public class MonitoredFeedsBean { - @NotNull - @GeneratedValue(strategy = GenerationType.AUTO) - @Id - private String id; - - @Basic - @NotNull - @Column(name = "feed_name") - private String feedName; - - public String getFeedName() { - return feedName; - } - - public void setFeedName(String feedName) { - this.feedName = feedName; - } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java index 41eb048..863abdc 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java +++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java @@ -17,6 +17,9 @@ */ package org.apache.falcon.persistence; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityType; + import javax.persistence.Entity; import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; @@ -35,13 +38,13 @@ import java.util.Date; * */ @Entity @NamedQueries({ - @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"), - @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"), - @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), - @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"), + @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), + @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"), @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a "), - @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime") + @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType") }) @Table(name = "PENDING_INSTANCES") //RESUME CHECKSTYLE CHECK LineLengthCheck @@ -53,8 +56,8 @@ public class PendingInstanceBean { @Basic @NotNull - @Column(name = "feed_name") - private String feedName; + @Column(name = "entity_name") + private String entityName; @Basic @NotNull @@ -66,6 +69,20 @@ public class PendingInstanceBean { @Column(name = "nominal_time") private Date nominalTime; + public String getEntityType() { + return entityType; + } + + public void setEntityType(String entityType) throws FalconException { + checkEntityType(entityType); + this.entityType = entityType; + } + + @Basic + @NotNull + @Column(name = "entity_type") + private String entityType; + public Date getNominalTime() { return nominalTime; } @@ -90,11 +107,28 @@ public class PendingInstanceBean { this.clusterName = clusterName; } - public String getFeedName() { - return feedName; + public String getEntityName() { + return entityName; + } + + public void setEntityName(String entityName) { + this.entityName = entityName; } - public void setFeedName(String feedName) { - this.feedName = feedName; + public static final String ENTITYNAME = "entityName"; + + public static final String CLUSTERNAME = "clusterName"; + + public static final String NOMINALTIME = "nominalTime"; + + public static final String ENTITYTYPE = "entityType"; + + void checkEntityType(String entityType)throws FalconException { + if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){ + return; + } else { + throw new FalconException("EntityType"+ entityType + + " is not valid,Feed and Process are the valid input type."); + } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 72c382e..f9aa1f5 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -30,7 +30,7 @@ public final class PersistenceConstants { public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES"; public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE"; public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES"; - public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED"; + public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY"; public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES"; public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES"; public static final String GET_ENTITY = "GET_ENTITY"; @@ -55,10 +55,10 @@ public final class PersistenceConstants { public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE"; public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE"; public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME"; - public static final String GET_FEED_ALERTS = "GET_FEED_ALERTS"; - public static final String GET_ALL_FEED_ALERTS = "GET_ALL_FEED_ALERTS"; + public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS"; + public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS"; public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES"; public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; - public static final String GET_FEED_ALERT_INSTANCE = "GET_FEED_ALERT_INSTANCE"; - public static final String DELETE_FEED_ALERT_INSTANCE = "DELETE_FEED_ALERT_INSTANCE"; + public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; + public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index 1bdfc25..102b986 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -244,8 +244,8 @@ public class FalconStateStoreDBCLI { args.add("org.apache.falcon.persistence.EntityBean"); args.add("org.apache.falcon.persistence.InstanceBean"); args.add("org.apache.falcon.persistence.PendingInstanceBean"); - args.add("org.apache.falcon.persistence.MonitoredFeedsBean"); - args.add("org.apache.falcon.persistence.FeedSLAAlertBean"); + args.add("org.apache.falcon.persistence.MonitoredEntityBean"); + args.add("org.apache.falcon.persistence.EntitySLAAlertBean"); return args.toArray(new String[args.size()]); } http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index c9b444d..ac2f397 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -26,8 +26,8 @@ <class>org.apache.falcon.persistence.EntityBean</class> <class>org.apache.falcon.persistence.InstanceBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> - <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> - <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> + <class>org.apache.falcon.persistence.MonitoredEntityBean</class> + <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -37,7 +37,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> + org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -58,8 +58,8 @@ <class>org.apache.falcon.persistence.EntityBean</class> <class>org.apache.falcon.persistence.InstanceBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> - <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> - <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> + <class>org.apache.falcon.persistence.MonitoredEntityBean</class> + <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -69,7 +69,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> + org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> @@ -88,9 +88,9 @@ <class>org.apache.falcon.persistence.EntityBean</class> <class>org.apache.falcon.persistence.InstanceBean</class> - <class>org.apache.falcon.persistence.MonitoredFeedsBean</class> + <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> - <class>org.apache.falcon.persistence.FeedSLAAlertBean</class> + <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -100,7 +100,7 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property> + org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 374ff17..de24621 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -35,7 +35,7 @@ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ org.apache.falcon.extensions.ExtensionService,\ - org.apache.falcon.service.FeedSLAMonitoringService,\ + org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.entity.store.ConfigurationStore,\ org.apache.falcon.rerun.service.RetryService,\ @@ -67,7 +67,7 @@ org.apache.falcon.entity.ColoClusterRelation,\ org.apache.falcon.group.FeedGroupMap,\ org.apache.falcon.entity.store.FeedLocationStore,\ - org.apache.falcon.service.FeedSLAMonitoringService,\ + org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.SharedLibraryHostingService ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## # org.apache.falcon.state.store.jdbc.JdbcStateStore http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java index 3817056..afd9307 100644 --- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java +++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java @@ -79,7 +79,7 @@ public class AbstractTestBase { cleanupStore(); String listeners = StartupProperties.get().getProperty("configstore.listeners"); listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""); - listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", ""); + listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", ""); StartupProperties.get().setProperty("configstore.listeners", listeners); store = ConfigurationStore.get(); store.init(); http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java index 40c077e..d13769e 100644 --- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java +++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java @@ -63,7 +63,7 @@ public class FeedLocationStoreTest extends AbstractTestBase { cleanupStore(); String listeners = StartupProperties.get().getProperty("configstore.listeners"); listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""); - listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", ""); + listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", ""); StartupProperties.get().setProperty("configstore.listeners", listeners); store = ConfigurationStore.get(); store.init(); http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FalconNativeScheduler.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki index 1f51739..b15fd5b 100644 --- a/docs/src/site/twiki/FalconNativeScheduler.twiki +++ b/docs/src/site/twiki/FalconNativeScheduler.twiki @@ -27,7 +27,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ org.apache.falcon.service.ProcessSubscriberService,\ - org.apache.falcon.service.FeedSLAMonitoringService,\ + org.apache.falcon.service.EntitySLAMonitoringService,\ org.apache.falcon.service.LifecyclePolicyMap,\ org.apache.falcon.service.FalconJPAService,\ org.apache.falcon.entity.store.ConfigurationStore,\ http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FeedSLAMonitoring.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FeedSLAMonitoring.twiki b/docs/src/site/twiki/FeedSLAMonitoring.twiki index 88132ce..469c0aa 100644 --- a/docs/src/site/twiki/FeedSLAMonitoring.twiki +++ b/docs/src/site/twiki/FeedSLAMonitoring.twiki @@ -6,7 +6,7 @@ Feed SLA monitoring service requires FalconJPAService to be up.Following are the In startup.properties : *.application.services= org.apache.falcon.state.store.service.FalconJPAService, - org.apache.falcon.service.FeedSLAMonitoringService + org.apache.falcon.service.EntitySLAMonitoringService These properties are required for FalconJPAService in statestore.properties: http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java index df2a1e0..56376fc 100644 --- a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java +++ b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java @@ -26,7 +26,7 @@ import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.service.FeedSLAMonitoringService; +import org.apache.falcon.service.EntitySLAMonitoringService; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.WorkflowExecutionListener; import org.apache.hadoop.fs.Path; @@ -45,8 +45,14 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener { @Override public void onSuccess(WorkflowExecutionContext context) throws FalconException { if (context.hasWorkflowSucceeded()) { - updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(), + if (context.getEntityType().toString().equals(EntityType.FEED.name())){ + updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(), context.getOutputFeedInstancePathsList()); + } + if (context.getEntityType().toString().equals(EntityType.PROCESS.name())){ + EntitySLAMonitoringService.get().makeProcessInstanceAvailable(context.getClusterName(), + context.getEntityName(), context.getNominalTimeAsISO8601(), context.getEntityType()); + } } } @@ -60,7 +66,8 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener { String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath(); Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]), EntityUtil.getTimeZone(feed)); - FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date); + EntitySLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], + clusterName, date); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 4fd1b53..c1f818a 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -18,11 +18,13 @@ package org.apache.falcon.jdbc; import org.apache.commons.collections.CollectionUtils; -import org.apache.falcon.persistence.MonitoredFeedsBean; -import org.apache.falcon.persistence.FeedSLAAlertBean; -import org.apache.falcon.persistence.PersistenceConstants; + +import org.apache.falcon.FalconException; +import org.apache.falcon.persistence.MonitoredEntityBean; import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.persistence.PersistenceConstants; import org.apache.falcon.persistence.ResultNotFoundException; +import org.apache.falcon.persistence.EntitySLAAlertBean; import org.apache.falcon.service.FalconJPAService; import javax.persistence.EntityManager; @@ -32,7 +34,7 @@ import java.util.Date; import java.util.List; /** -* StateStore for MonitoringFeeds and PendingFeedInstances. +* StateStore for MonitoringEntity and PendingEntityInstances. */ public class MonitoringJdbcStateStore { @@ -42,23 +44,25 @@ public class MonitoringJdbcStateStore { } - public void putMonitoredFeed(String feedName){ + public void putMonitoredEntity(String entityName, String entityType) throws FalconException{ - MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean(); - monitoredFeedsBean.setFeedName(feedName); + MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean(); + monitoredEntityBean.setEntityName(entityName); + monitoredEntityBean.setEntityType(entityType); EntityManager entityManager = getEntityManager(); try { beginTransaction(entityManager); - entityManager.persist(monitoredFeedsBean); + entityManager.persist(monitoredEntityBean); } finally { commitAndCloseTransaction(entityManager); } } - public MonitoredFeedsBean getMonitoredFeed(String feedName){ + public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){ EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE); - q.setParameter("feedName", feedName); + q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); + q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType); List result = q.getResultList(); try { if (result.isEmpty()) { @@ -67,14 +71,15 @@ public class MonitoringJdbcStateStore { } finally { entityManager.close(); } - return ((MonitoredFeedsBean)result.get(0)); + return ((MonitoredEntityBean)result.get(0)); } - public void deleteMonitoringFeed(String feedName) { + public void deleteMonitoringEntity(String entityName, String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES); - q.setParameter("feedName", feedName); + q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName); + q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType); try{ q.executeUpdate(); } finally { @@ -82,7 +87,7 @@ public class MonitoringJdbcStateStore { } } - public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException { + public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS); List result = q.getResultList(); @@ -90,22 +95,24 @@ public class MonitoringJdbcStateStore { return result; } - public Date getLastInstanceTime(String feedName) throws ResultNotFoundException { + public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class); - q.setParameter("feedName", feedName); + q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); Date result = (Date)q.getSingleResult(); entityManager.close(); return result; } - public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){ + public void deletePendingInstance(String entityName, String clusterName , Date nominalTime, String entityType){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime", nominalTime); + q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); + q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); try{ q.executeUpdate(); } finally { @@ -113,12 +120,13 @@ public class MonitoringJdbcStateStore { } } - public void deletePendingInstances(String feedName, String clusterName){ + public void deletePendingInstances(String entityName, String clusterName, String entityType){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY); + q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); try{ q.executeUpdate(); } finally { @@ -126,23 +134,26 @@ public class MonitoringJdbcStateStore { } } - public void putPendingInstances(String feed, String clusterName, Date nominalTime){ + public void putPendingInstances(String entity, String clusterName, Date nominalTime, String entityType) + throws FalconException{ EntityManager entityManager = getEntityManager(); PendingInstanceBean pendingInstanceBean = new PendingInstanceBean(); - pendingInstanceBean.setFeedName(feed); + pendingInstanceBean.setEntityName(entity); pendingInstanceBean.setClusterName(clusterName); pendingInstanceBean.setNominalTime(nominalTime); + pendingInstanceBean.setEntityType(entityType); beginTransaction(entityManager); entityManager.persist(pendingInstanceBean); commitAndCloseTransaction(entityManager); } - public List<Date> getNominalInstances(String feedName, String clusterName) { + public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) { EntityManager entityManager = getEntityManager(); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); + q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); + q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); List result = q.getResultList(); entityManager.close(); return result; @@ -168,15 +179,17 @@ public class MonitoringJdbcStateStore { entityManager.close(); } - public PendingInstanceBean getPendingInstance(String feedName, String clusterName, Date nominalTime) { + public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime, + String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE, PendingInstanceBean.class); - q.setParameter("feedName", feedName); + q.setParameter(PendingInstanceBean.ENTITYNAME, entityName); - q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime", nominalTime); + q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName); + q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime); + q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType); try { return q.getSingleResult(); } finally { @@ -184,14 +197,16 @@ public class MonitoringJdbcStateStore { } } - public FeedSLAAlertBean getFeedAlertInstance(String feedName, String clusterName, Date nominalTime) { + public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime, + String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - TypedQuery<FeedSLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_FEED_ALERT_INSTANCE, - FeedSLAAlertBean.class); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime", nominalTime); + TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants. + GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class); + q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); try { return q.getSingleResult(); } finally { @@ -199,30 +214,32 @@ public class MonitoringJdbcStateStore { } } - public void putSLAAlertInstance(String feedName, String cluster, Date nominalTime, Boolean isSLALowMissed, - Boolean isSLAHighMissed) { + public void putSLAAlertInstance(String entityName, String cluster, String entityType, Date nominalTime, + Boolean isSLALowMissed, Boolean isSLAHighMissed) throws FalconException{ EntityManager entityManager = getEntityManager(); - FeedSLAAlertBean feedSLAAlertBean = new FeedSLAAlertBean(); - feedSLAAlertBean.setFeedName(feedName); - feedSLAAlertBean.setClusterName(cluster); - feedSLAAlertBean.setNominalTime(nominalTime); - feedSLAAlertBean.setIsSLALowMissed(isSLALowMissed); - feedSLAAlertBean.setIsSLAHighMissed(isSLAHighMissed); + EntitySLAAlertBean entitySLAAlertBean = new EntitySLAAlertBean(); + entitySLAAlertBean.setEntityName(entityName); + entitySLAAlertBean.setClusterName(cluster); + entitySLAAlertBean.setNominalTime(nominalTime); + entitySLAAlertBean.setIsSLALowMissed(isSLALowMissed); + entitySLAAlertBean.setIsSLAHighMissed(isSLAHighMissed); + entitySLAAlertBean.setEntityType(entityType); try { beginTransaction(entityManager); - entityManager.persist(feedSLAAlertBean); + entityManager.persist(entitySLAAlertBean); } finally { commitAndCloseTransaction(entityManager); } } - public void updateSLAAlertInstance(String feedName, String clusterName, Date nominalTime) { + public void updateSLAAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType) { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime", nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); try{ q.executeUpdate(); } finally { @@ -230,13 +247,14 @@ public class MonitoringJdbcStateStore { } } - public void deleteFeedAlertInstance(String feedName, String clusterName, Date nominalTime){ + public void deleteEntityAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType){ EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); - Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_FEED_ALERT_INSTANCE); - q.setParameter("feedName", feedName); - q.setParameter("clusterName", clusterName); - q.setParameter("nominalTime", nominalTime); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE); + q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName); + q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName); + q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime); + q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType); try{ q.executeUpdate(); } finally { @@ -245,7 +263,7 @@ public class MonitoringJdbcStateStore { } - public List<FeedSLAAlertBean> getSLAHighCandidates() { + public List<EntitySLAAlertBean> getSLAHighCandidates() { EntityManager entityManager = getEntityManager(); beginTransaction(entityManager); Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES); http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index c6903a4..895f8b2 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -31,7 +31,7 @@ import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.UnschedulableEntityException; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.monitors.Dimension; -import org.apache.falcon.service.FeedSLAMonitoringService; +import org.apache.falcon.service.EntitySLAMonitoringService; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.hadoop.security.authorize.AuthorizationException; @@ -162,11 +162,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr); if (StringUtils.isBlank(feedName)) { - instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end)); + instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end)); } else { for (String clusterName : DeploymentUtil.getCurrentClusters()) { - instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName, - clusterName, start, end)); + instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName, + clusterName, start, end, EntityType.FEED.toString())); } } } catch (FalconException e) { http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java index 53a9de1..249c273 100644 --- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java +++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java @@ -134,7 +134,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana @Override protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException { - return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName, + return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName, start, end, colo); } }.execute(); http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java new file mode 100644 index 0000000..f023c35 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + + +import java.util.Date; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.ArrayList; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.apache.falcon.persistence.PendingInstanceBean; +import org.apache.falcon.resource.SchedulableEntityInstance; +import org.apache.falcon.util.ReflectionUtils; +import org.apache.falcon.util.StartupProperties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to know which all feeds have missed SLA. + */ +public final class EntitySLAAlertService implements FalconService, EntitySLAListener { + + private static final String NAME = "EntitySLAAlertService"; + + private static final Logger LOG = LoggerFactory.getLogger(EntitySLAAlertService.class); + + private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + + private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>(); + + private static final EntitySLAAlertService SERVICE = new EntitySLAAlertService(); + + public static EntitySLAAlertService get() { + return SERVICE; + } + + private EntitySLAAlertService(){} + + + @Override + public String getName() { + return NAME; + } + + @Override + public void init() throws FalconException { + String listenerClassNames = StartupProperties.get(). + getProperty("feedAlert.listeners"); + for (String listenerClassName : listenerClassNames.split(",")) { + listenerClassName = listenerClassName.trim(); + if (listenerClassName.isEmpty()) { + continue; + } + EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); + registerListener(listener); + } + + String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600"); + int statusCheckFrequencySeconds = Integer.parseInt(freq); + + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS); + } + + public void registerListener(EntitySLAListener listener) { + listeners.add(listener); + } + + @Override + public void destroy() throws FalconException { + + } + + + private class Monitor implements Runnable { + + @Override + public void run() { + processSLACandidates(); + } + } + + void processSLACandidates(){ + //Get all feeds instances to be monitored + List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances(); + if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){ + return; + } + + LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size()); + try{ + for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) { + + String entityName = pendingInstanceBean.getEntityName(); + String clusterName = pendingInstanceBean.getClusterName(); + Date nominalTime = pendingInstanceBean.getNominalTime(); + String entityType = pendingInstanceBean.getEntityType(); + + org.apache.falcon.entity.v0.cluster.Cluster cluster = ClusterHelper.getCluster(clusterName); + + Set<SchedulableEntityInstance> schedulableEntityInstances= EntitySLAMonitoringService.get(). + getEntitySLAMissPendingAlerts(entityName, cluster.getName(), nominalTime, nominalTime + , entityType); + if (schedulableEntityInstances.isEmpty()){ + store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime, + entityType); + return; + } + List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances); + SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0); + + + if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_WARN)) { + store.putSLAAlertInstance(entityName, clusterName, entityType, + nominalTime, true, false); + //Mark in DB as SLA missed + LOG.info("Feed :"+ entityName + + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow"); + } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){ + if (entityType.equals(EntityType.PROCESS.name())){ + store.putSLAAlertInstance(entityName, clusterName, entityType, + nominalTime, true, false); + } + store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType); + LOG.info("Entity :"+ entityName + + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType + + "missed SLAHigh"); + highSLAMissed(entityName, clusterName, entityType, nominalTime); + } + } + } catch (FalconException e){ + LOG.error("Exception in FeedSLAALertService:", e); + } + + } + + @Override + public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime + ) throws FalconException { + for (EntitySLAListener listener : listeners) { + listener.highSLAMissed(entityName, clusterName, entityType, nominalTime); + store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java index 991052f..421ea38 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java @@ -18,7 +18,6 @@ package org.apache.falcon.service; import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.EntityType; import java.util.Date; @@ -26,6 +25,6 @@ import java.util.Date; * Interface for FeedSLAAlert to be used by Listeners. */ public interface EntitySLAListener { - void highSLAMissed(String enityName, EntityType entityType, String clusterName, Date nominalTime) + void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime) throws FalconException; }
