Repository: falcon Updated Branches: refs/heads/master 49fa46e29 -> 1f28bde6f
FALCON-2044 Persist Process stats in db Author: Praveen Adlakha <[email protected]> Reviewers: Ajay Yadava <[email protected]> Closes #196 from PraveenAdlakha/2044 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1f28bde6 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1f28bde6 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1f28bde6 Branch: refs/heads/master Commit: 1f28bde6f49aedd2ca95181f483c609a5304aecc Parents: 49fa46e Author: Praveen Adlakha <[email protected]> Authored: Tue Nov 29 13:09:23 2016 -0500 Committer: Ajay Yadava <[email protected]> Committed: Tue Nov 29 13:09:23 2016 -0500 ---------------------------------------------------------------------- .../persistence/PersistenceConstants.java | 2 +- .../persistence/ProcessInstanceInfoBean.java | 131 +++++++++++++++++++ .../falcon/tools/FalconStateStoreDBCLI.java | 1 + .../src/main/resources/META-INF/persistence.xml | 16 ++- common/src/main/resources/startup.properties | 3 +- .../site/twiki/GraphiteMetricCollection.twiki | 24 ---- docs/src/site/twiki/MetricCollection.twiki | 37 ++++++ docs/src/site/twiki/Operability.twiki | 6 +- .../falcon/jdbc/MonitoringJdbcStateStore.java | 36 +++++ .../plugin/ProcessExecutionStatsPlugin.java | 76 +++++++++++ .../jdbc/MonitoringJdbcStateStoreTest.java | 11 ++ src/build/findbugs-exclude.xml | 5 + src/conf/startup.properties | 5 +- 13 files changed, 317 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index fc82ae7..26a5cd4 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -79,5 +79,5 @@ public final class PersistenceConstants { public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS"; public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB"; public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB"; - + public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java new file mode 100644 index 0000000..c408510 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import javax.persistence.Entity; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Column; +import javax.validation.constraints.NotNull; +import java.util.Date; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Class to store info regarding process history. + */ +@Entity +@NamedQueries({ + @NamedQuery(name= PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES , query = "select OBJECT(a) from ProcessInstanceInfoBean a ") +}) +@Table(name = "ProcessInstanceInfo") +//RESUME CHECKSTYLE CHECK LineLengthCheck +public class ProcessInstanceInfoBean { + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @NotNull + @Column(name = "process_name") + private String processName; + + @NotNull + @Column(name = "colo") + private String colo; + + public String getPipeline() { + return pipeline; + } + + public void setPipeline(String pipeline) { + this.pipeline = pipeline; + } + + @NotNull + @Column(name = "pipeline") + private String pipeline; + + @NotNull + @Column(name = "status") + private String status; + + @NotNull + @Column(name = "nominal_time") + private Date nominalTime; + + @NotNull + @Column(name = "start_delay") + private long startDelay; + + @NotNull + @Column(name = "processing_time") + private long processingTime; + + public Date getNominalTime() { + return nominalTime; + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = nominalTime; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + public String getColo() { + return colo; + } + + public void setColo(String colo) { + this.colo = colo; + } + + public long getStartDelay() { + return startDelay; + } + + public void setStartDelay(long startDelay) { + this.startDelay = startDelay; + } + + public long getProcessingTime() { + return processingTime; + } + + public void setProcessingTime(long processingTime) { + this.processingTime = processingTime; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index 0c04da3..6ad887e 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -249,6 +249,7 @@ public class FalconStateStoreDBCLI { args.add("org.apache.falcon.persistence.BacklogMetricBean"); args.add("org.apache.falcon.persistence.ExtensionBean"); args.add("org.apache.falcon.persistence.ExtensionJobsBean"); + args.add("org.apache.falcon.persistence.ProcessInstanceInfoBean"); return args.toArray(new String[args.size()]); } http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index 0f20103..8d0bd25 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -31,7 +31,7 @@ <class>org.apache.falcon.persistence.BacklogMetricBean</class> <class>org.apache.falcon.persistence.ExtensionBean</class> <class>org.apache.falcon.persistence.ExtensionJobsBean</class> - + <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -41,8 +41,8 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/> - + org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean; + org.apache.falcon.persistence.ProcessInstanceInfoBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> <property name="openjpa.ReadLockLevel" value="read"/> @@ -67,6 +67,7 @@ <class>org.apache.falcon.persistence.BacklogMetricBean</class> <class>org.apache.falcon.persistence.ExtensionBean</class> <class>org.apache.falcon.persistence.ExtensionJobsBean</class> + <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -76,8 +77,8 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/> - + org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean; + org.apache.falcon.persistence.ProcessInstanceInfoBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> <property name="openjpa.ReadLockLevel" value="read"/> @@ -101,6 +102,7 @@ <class>org.apache.falcon.persistence.BacklogMetricBean</class> <class>org.apache.falcon.persistence.ExtensionBean</class> <class>org.apache.falcon.persistence.ExtensionJobsBean</class> + <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -110,8 +112,8 @@ value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; - org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionBean; - org.apache.falcon.persistence.ExtensionJobsBean)"/> + org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean; + org.apache.falcon.persistence.ProcessInstanceInfoBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> <property name="openjpa.ReadLockLevel" value="read"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 5d5da5a..f91f3b6 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -313,7 +313,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle # Setting monitoring plugin, if SMTP parameters is defined #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ -# org.apache.falcon.plugin.EmailNotificationPlugin +# org.apache.falcon.plugin.EmailNotificationPlugin,\ +# org.apache.falcon.plugin.ProcessExecutionStatsPlugin ######### StateStore Properties ##### #*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/GraphiteMetricCollection.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/GraphiteMetricCollection.twiki b/docs/src/site/twiki/GraphiteMetricCollection.twiki deleted file mode 100644 index 0ae0498..0000000 --- a/docs/src/site/twiki/GraphiteMetricCollection.twiki +++ /dev/null @@ -1,24 +0,0 @@ ----++Graphite Metric Collection - -Graphite Metric Collection currently allows to collect the following metrics at process level : - -1. Processing time the process spent in the running state in seconds (workflow_end_time - workflow_start_time) -2. Wait time that the process spent in the waiting/ready state. (workflow_start_time - workflow_nominal_time) -3. Number of instances that are failed for a process. - -To send data to graphite we need to intialize metricNotificationService in startup.properties: - -<verbatim> -*.application.services= org.apache.falcon.metrics.MetricNotificationService, -</verbatim> - -Add following properties for graphiteNotificationPlugin : - -*Graphite properties* -<verbatim> - * *.falcon.graphite.hostname=localhost - * *.falcon.graphite.port=2003 - * *.falcon.graphite.frequency=1 - * *.falcon.graphite.prefix=falcon -</verbatim> -The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite is in seconds. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/MetricCollection.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/MetricCollection.twiki b/docs/src/site/twiki/MetricCollection.twiki new file mode 100644 index 0000000..636c739 --- /dev/null +++ b/docs/src/site/twiki/MetricCollection.twiki @@ -0,0 +1,37 @@ +---++Metric Collection + +Metric Collection currently allows to collect the following metrics at process level: + + 1. Processing time the process spent in the running state in seconds (workflow_end_time - workflow_start_time) + 1.Wait time that the process spent in the waiting/ready state. (workflow_start_time - workflow_nominal_time) + 1.Number of instances that are failed for a process. + +To send data to *Graphite* + +Falcon need to intialize metricNotificationService in startup.properties: + +<verbatim> +*.application.services= org.apache.falcon.metrics.MetricNotificationService, +</verbatim> + +Add following properties for graphiteNotificationPlugin : + +*Graphite properties* +<verbatim> + * *.falcon.graphite.hostname=localhost + * *.falcon.graphite.port=2003 + * *.falcon.graphite.frequency=1 + * *.falcon.graphite.prefix=falcon +</verbatim> +The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite is in seconds. + + +To send data to *Falcon DB* + +Falcon needs to *!ProcessInstanceInfo* table in the database have a look at [[FalconDatabase]] to know how to create it. + +Add the following properties in the startup.properties: + +<verbatim> +#*.monitoring.plugins=org.apache.falcon.plugin.ProcessExecutionStatsPlugin +</verbatim> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/Operability.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Operability.twiki b/docs/src/site/twiki/Operability.twiki index f01c235..e21ada8 100644 --- a/docs/src/site/twiki/Operability.twiki +++ b/docs/src/site/twiki/Operability.twiki @@ -228,6 +228,8 @@ Users may also extend the Falcon Audit plugin to send audits to systems like Apa extending org.apache.falcon.plugin.AuditingPlugin interface. ----++ Metrics Collection In Graphite +---++ Metrics Collection In Graphite and Database -Falcon has support to send metrics to graphite more details regarding this can be found on [[GraphiteMetricCollection][Graphite Metric Collection]] \ No newline at end of file +Falcon has support to send process metrics like waiting time ,exection time and number of failures to graphite and falcon db. + +For details go through [[MetricCollection][Metric Collection]] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java index 552ebde..669e18d 100644 --- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java +++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java @@ -25,6 +25,7 @@ import org.apache.falcon.persistence.PendingInstanceBean; import org.apache.falcon.persistence.PersistenceConstants; import org.apache.falcon.persistence.ResultNotFoundException; import org.apache.falcon.persistence.EntitySLAAlertBean; +import org.apache.falcon.persistence.ProcessInstanceInfoBean; import org.apache.falcon.service.FalconJPAService; import javax.persistence.EntityManager; @@ -198,6 +199,41 @@ public class MonitoringJdbcStateStore { return result; } + public void putProcessInstance(String processName, String colo, Long nominalTime, Long startDelay, + Long processingTime, String pipeline, String status){ + ProcessInstanceInfoBean processInstanceInfoBean = new ProcessInstanceInfoBean(); + processInstanceInfoBean.setProcessName(processName); + processInstanceInfoBean.setColo(colo); + processInstanceInfoBean.setNominalTime(new Date(nominalTime)); + processInstanceInfoBean.setStartDelay(startDelay); + processInstanceInfoBean.setProcessingTime(processingTime); + processInstanceInfoBean.setPipeline(pipeline); + processInstanceInfoBean.setStatus(status); + + EntityManager entityManager = getEntityManager(); + try { + beginTransaction(entityManager); + entityManager.persist(processInstanceInfoBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public List<ProcessInstanceInfoBean> getAllInstancesProcessInstance(){ + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES); + List result = q.getResultList(); + + try { + if (CollectionUtils.isEmpty(result)) { + return null; + } + } finally{ + entityManager.close(); + } + return result; + } + private void commitAndCloseTransaction(EntityManager entityManager) { entityManager.getTransaction().commit(); entityManager.close(); http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java new file mode 100644 index 0000000..676c17b --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.plugin; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.aspect.ResourceMessage; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.jdbc.MonitoringJdbcStateStore; +import org.joda.time.DateTime; +import org.joda.time.Seconds; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This plugin writes process completion time ,number of failures and wait time to DB. + */ +public class ProcessExecutionStatsPlugin implements MonitoringPlugin { + private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionStatsPlugin.class); + + private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore(); + + @Override + public void monitor(ResourceMessage message) { + try { + String entityType = StringUtils.isNotBlank(message.getDimensions().get("entityType")) + ? message.getDimensions().get("entityType") :message.getDimensions().get("entity-type"); + String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName")) + ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name"); + LOG.debug("message:" + message.getAction()); + if (entityType.equalsIgnoreCase(EntityType.PROCESS.name()) + && ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) { + Process process = ConfigurationStore.get().get(EntityType.PROCESS, entityName); + String pipelines = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines() + : "__untagged"; + String cluster = message.getDimensions().get("cluster"); + DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time")); + DateTime startTime = new DateTime(message.getDimensions().get("start-time")); + Long startDelay = (long) Seconds.secondsBetween(nominalTime, startTime).getSeconds(); + Long timeTaken = message.getExecutionTime() / 1000000000; + + String [] pipelineNames = pipelines.split(","); + + for(String name : pipelineNames){ + + if ((message.getAction().equals("wf-instance-succeeded"))) { + MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster, nominalTime.getMillis(), + startDelay, timeTaken, name, "succeeded"); + } + if (message.getAction().equals("wf-instance-failed")){ + MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster, nominalTime.getMillis(), + startDelay, timeTaken, name, "failed"); + } + } + } + } catch (Exception e) { + LOG.error("Exception in sending metrics to FalconDB:", e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java index a64b654..860fbfc 100644 --- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java +++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java @@ -19,6 +19,7 @@ package org.apache.falcon.jdbc; import java.io.File; import java.util.Date; +import java.util.List; import javax.persistence.EntityManager; import javax.persistence.Query; @@ -27,6 +28,7 @@ import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.persistence.ProcessInstanceInfoBean; import org.apache.falcon.service.FalconJPAService; import org.apache.falcon.tools.FalconStateStoreDBCLI; import org.apache.falcon.util.StateStoreProperties; @@ -173,6 +175,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase { "test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed()); } + @Test + public void putProcessInstance() throws Exception{ + MonitoringJdbcStateStore store = new MonitoringJdbcStateStore(); + store.putProcessInstance("test-process", "test-colo", 1466602429423L, 99999999L, 99999999L, "test", "failed"); + List<ProcessInstanceInfoBean> list = store.getAllInstancesProcessInstance(); + ProcessInstanceInfoBean processInstanceInfoBean = list.get(0); + Assert.assertEquals("test-process", processInstanceInfoBean.getProcessName()); + } + private void clear() { EntityManager em = FalconJPAService.get().getEntityManager(); em.getTransaction().begin(); http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/build/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 04e267f..189f2f8 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -75,6 +75,11 @@ </Match> <Match> + <Class name="org.apache.falcon.persistence.ProcessInstanceInfoBean" /> + <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> + </Match> + + <Match> <Class name="org.apache.falcon.persistence.ExtensionJobsBean" /> <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" /> </Match> http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 6a95cce..901c3a9 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -358,8 +358,11 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ #*.falcon.email.smtp.password="" # Setting monitoring plugin, if SMTP parameters is defined +# DefaultMonitoringPlugin #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ -# org.apache.falcon.plugin.EmailNotificationPlugin +# org.apache.falcon.plugin.EmailNotificationPlugin,\ +# org.apache.falcon.plugin.ProcessExecutionStatsPlugin + # Graphite properties #*.falcon.graphite.hostname=localhost #*.falcon.graphite.port=2003
