Repository: falcon Updated Branches: refs/heads/master e94dd72fa -> c7996deb7
FALCON-2059 BacklogMetricEmitter Service for Falcon Processes Author: pavan.kolamuri <[email protected]> Author: Pavan Kolamuri <[email protected]> Reviewers: @pallavi-rao Closes #212 from pavankumar526/master and squashes the following commits: ad84f0f [Pavan Kolamuri] Fixed checkstyle issues dea8f93 [pavan.kolamuri] Added doc in startup.properties dbe3a7f [pavan.kolamuri] Added more log statements d72d228 [pavan.kolamuri] Exception changed to throwable 46dcef8 [pavan.kolamuri] Fixed bug in oozieworkflowengine FALCON-2059 e92d3bc [pavan.kolamuri] Add isMissing method FALCON-2059 6d6cf81 [pavan.kolamuri] Handled when entity was deleted FALCON-2059 6c03701 [pavan.kolamuri] Fixed User authentication issue in oozie 81f0b03 [pavan.kolamuri] Rebased the patch e3dbe88 [pavan.kolamuri] Handled multiple pipelines processes FALCON-2059 b5d9e70 [pavan.kolamuri] Addressed based on comments FALCON-2059 fb78fba [pavan.kolamuri] Refactored changes based on EntitySLAAlert service 80c015a [pavan.kolamuri] FALCON-2059 BacklogMetricEmitter Service for Falcon Processes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c7996deb Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c7996deb Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c7996deb Branch: refs/heads/master Commit: c7996deb76819fc7ef98dd28fd4dff19474b7602 Parents: e94dd72 Author: pavan.kolamuri <[email protected]> Authored: Fri Jul 29 08:40:49 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Jul 29 08:40:49 2016 +0530 ---------------------------------------------------------------------- .../falcon/persistence/BacklogMetricBean.java | 116 ++++++ .../persistence/PersistenceConstants.java | 2 + .../falcon/tools/FalconStateStoreDBCLI.java | 1 + .../workflow/engine/AbstractWorkflowEngine.java | 2 + .../src/main/resources/META-INF/persistence.xml | 9 +- common/src/main/resources/startup.properties | 4 + .../entity/parser/ProcessEntityParserTest.java | 2 +- .../resources/config/process/process-0.1.xml | 2 +- .../workflow/engine/OozieWorkflowEngine.java | 26 +- .../apache/falcon/jdbc/BacklogMetricStore.java | 121 +++++++ .../falcon/resource/channel/HTTPChannel.java | 5 +- .../service/BacklogMetricEmitterService.java | 356 +++++++++++++++++++ .../falcon/service/EntitySLAAlertService.java | 6 +- .../falcon/service/EntitySLAListener.java | 3 +- .../java/org/apache/falcon/util/MetricInfo.java | 79 ++++ .../BacklogMetricEmitterServiceTest.java | 133 +++++++ prism/src/test/resources/startup.properties | 338 ++++++++++++++++++ .../workflow/engine/FalconWorkflowEngine.java | 5 + src/build/findbugs-exclude.xml | 6 + src/conf/startup.properties | 16 + 20 files changed, 1217 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java new file mode 100644 index 0000000..b563da7 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.persistence; + +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; +import java.util.Date; + +//SUSPEND CHECKSTYLE CHECK LineLengthCheck +/** + * Backlog Metric Object stored in DB. + */ +@Entity +@NamedQueries({ + @NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "), + @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType") +}) +//RESUME CHECKSTYLE CHECK LineLengthCheck + +@Table(name = "BACKLOG_METRIC") +public class BacklogMetricBean { + + @NotNull + @GeneratedValue(strategy = GenerationType.AUTO) + @Id + private String id; + + @Basic + @NotNull + @Index + @Column(name = "entity_name") + private String entityName; + + @Basic + @NotNull + @Column(name = "cluster_name") + private String clusterName; + + @Basic + @NotNull + @Index + @Column(name = "nominal_time") + private Date nominalTime; + + @Basic + @NotNull + @Index + @Column(name = "entity_type") + private String entityType; + + + public String getId() { + return id; + } + + public String getEntityName() { + return entityName; + } + + public String getClusterName() { + return clusterName; + } + + public Date getNominalTime() { + return nominalTime; + } + + public void setId(String id) { + this.id = id; + } + + public void setEntityName(String entityName) { + this.entityName = entityName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public void setNominalTime(Date nominalTime) { + this.nominalTime = nominalTime; + } + + public String getEntityType() { + return entityType; + } + + public void setEntityType(String entityType) { + this.entityType = entityType; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java index 7c2479d..5c3de51 100644 --- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java +++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java @@ -61,5 +61,7 @@ public final class PersistenceConstants { public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH"; public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE"; public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE"; + public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE"; + public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES"; public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY"; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java index 102b986..9c6e8b3 100644 --- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java +++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -246,6 +246,7 @@ public class FalconStateStoreDBCLI { args.add("org.apache.falcon.persistence.PendingInstanceBean"); args.add("org.apache.falcon.persistence.MonitoredEntityBean"); args.add("org.apache.falcon.persistence.EntitySLAAlertBean"); + args.add("org.apache.falcon.persistence.BacklogMetricBean"); return args.toArray(new String[args.size()]); } http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java index 4d8402a..0db7e9b 100644 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java @@ -72,6 +72,8 @@ public abstract class AbstractWorkflowEngine { public abstract boolean isCompleted(Entity entity) throws FalconException; + public abstract boolean isMissing(Entity entity) throws FalconException; + public abstract InstancesResult getRunningInstances(Entity entity, List<LifeCycle> lifeCycles) throws FalconException; http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml index ac2f397..d58e21c 100644 --- a/common/src/main/resources/META-INF/persistence.xml +++ b/common/src/main/resources/META-INF/persistence.xml @@ -28,6 +28,7 @@ <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> + <class>org.apache.falcon.persistence.BacklogMetricBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -60,7 +61,7 @@ <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> - + <class>org.apache.falcon.persistence.BacklogMetricBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -91,7 +92,7 @@ <class>org.apache.falcon.persistence.MonitoredEntityBean</class> <class>org.apache.falcon.persistence.PendingInstanceBean</class> <class>org.apache.falcon.persistence.EntitySLAAlertBean</class> - + <class>org.apache.falcon.persistence.BacklogMetricBean</class> <properties> <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> @@ -100,8 +101,8 @@ <property name="openjpa.MetaDataFactory" value="jpa(Types=org.apache.falcon.persistence.EntityBean; org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean; - org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property> - + org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean; + org.apache.falcon.persistence.BacklogMetricBean)"/> <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> <property name="openjpa.LockManager" value="pessimistic"/> <property name="openjpa.ReadLockLevel" value="read"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index de24621..4b692a2 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -332,3 +332,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle #*.falcon.graphite.port=2003 #*.falcon.graphite.frequency=1 #*.falcon.graphite.prefix=falcon + +# Backlog Metric Properties +#*.falcon.backlog.metricservice.emit.interval.millisecs=60000 +#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000 http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java index c4bfff6..3398c26 100644 --- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java +++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java @@ -103,7 +103,7 @@ public class ProcessEntityParserTest extends AbstractTestBase { Assert.assertEquals(process.getTags(), "[email protected], [email protected], _department_type=forecasting"); - Assert.assertEquals(process.getPipelines(), "testPipeline,dataReplication_Pipeline"); + Assert.assertEquals(process.getPipelines(), "testPipeline"); Assert.assertEquals(process.getInputs().getInputs().get(0).getName(), "impression"); Assert.assertEquals(process.getInputs().getInputs().get(0).getFeed(), "impressionFeed"); http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/resources/config/process/process-0.1.xml ---------------------------------------------------------------------- diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml index 4ce7ad1..1550101 100644 --- a/common/src/test/resources/config/process/process-0.1.xml +++ b/common/src/test/resources/config/process/process-0.1.xml @@ -18,7 +18,7 @@ --> <process name="sample" version="0" xmlns="uri:falcon:process:0.1"> <tags>[email protected], [email protected], _department_type=forecasting</tags> - <pipelines>testPipeline,dataReplication_Pipeline</pipelines> + <pipelines>testPipeline</pipelines> <clusters> <cluster name="testCluster"> <validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 9a09f18..38a6c00 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -272,6 +272,24 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { || isBundleInState(bundles, BundleStatus.KILLED)); } + @Override + public boolean isMissing(Entity entity) throws FalconException { + List<String> bundlesToRemove = new ArrayList<>(); + Map<String, BundleJob> bundles = findLatestBundle(entity); + for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) { + if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster + bundlesToRemove.add(clusterBundle.getKey()); + } + } + for (String bundleToRemove : bundlesToRemove) { + bundles.remove(bundleToRemove); + } + if (bundles.size() == 0) { + return true; + } + return false; + } + private enum BundleStatus { ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED } @@ -1216,9 +1234,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private boolean isCoordApplicable(String appName, List<LifeCycle> lifeCycles) { - for (LifeCycle lifeCycle : lifeCycles) { - if (appName.contains(lifeCycle.getTag().name())) { - return true; + if (lifeCycles != null && !lifeCycles.isEmpty()) { + for (LifeCycle lifeCycle : lifeCycles) { + if (appName.contains(lifeCycle.getTag().name())) { + return true; + } } } return false; http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java new file mode 100644 index 0000000..ef9a396 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.jdbc; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.persistence.BacklogMetricBean; +import org.apache.falcon.persistence.PersistenceConstants; +import org.apache.falcon.service.BacklogMetricEmitterService; +import org.apache.falcon.service.FalconJPAService; +import org.apache.falcon.util.MetricInfo; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Backlog Metric Store for entitties. + */ +public class BacklogMetricStore { + + private EntityManager getEntityManager() { + return FalconJPAService.get().getEntityManager(); + } + + + public void addInstance(String entityName, String cluster, Date nominalTime, EntityType entityType) { + BacklogMetricBean backlogMetricBean = new BacklogMetricBean(); + backlogMetricBean.setClusterName(cluster); + backlogMetricBean.setEntityName(entityName); + backlogMetricBean.setNominalTime(nominalTime); + backlogMetricBean.setEntityType(entityType.name()); + EntityManager entityManager = getEntityManager(); + try { + beginTransaction(entityManager); + entityManager.persist(backlogMetricBean); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + public synchronized void deleteMetricInstance(String entityName, String cluster, Date nominalTime, + EntityType entityType) { + EntityManager entityManager = getEntityManager(); + beginTransaction(entityManager); + Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE); + q.setParameter("entityName", entityName); + q.setParameter("clusterName", cluster); + q.setParameter("nominalTime", nominalTime); + q.setParameter("entityType", entityType.name()); + try{ + q.executeUpdate(); + } finally { + commitAndCloseTransaction(entityManager); + } + } + + + private void beginTransaction(EntityManager entityManager) { + entityManager.getTransaction().begin(); + } + + private void commitAndCloseTransaction(EntityManager entityManager) { + if (entityManager != null) { + entityManager.getTransaction().commit(); + entityManager.close(); + } + } + + public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException { + EntityManager entityManager = getEntityManager(); + Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_BACKLOG_INSTANCES); + List<BacklogMetricBean> result = q.getResultList(); + + try { + if (CollectionUtils.isEmpty(result)) { + return null; + } + } finally{ + entityManager.close(); + } + + Map<Entity, List<MetricInfo>> backlogMetrics = new HashMap<>(); + for (BacklogMetricBean backlogMetricBean : result) { + Entity entity = EntityUtil.getEntity(backlogMetricBean.getEntityType(), + backlogMetricBean.getEntityName()); + if (!backlogMetrics.containsKey(entity)) { + backlogMetrics.put(entity, new ArrayList<MetricInfo>()); + } + List<MetricInfo> metrics = backlogMetrics.get(entity); + MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get() + .format(backlogMetricBean.getNominalTime()), + backlogMetricBean.getClusterName()); + metrics.add(metricInfo); + backlogMetrics.put(entity, metrics); + } + return backlogMetrics; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java index a63ae63..187d6c7 100644 --- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java +++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java @@ -46,6 +46,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status.Family; import javax.ws.rs.core.UriBuilder; +import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.List; @@ -138,8 +139,8 @@ public class HTTPChannel extends AbstractChannel { if (incomingRequest != null) { incomingRequest.getInputStream().reset(); } - } catch (Exception ignore) { - // nothing to be done; + } catch (IOException e) { + LOG.error("Error in HTTPChannel", e); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java new file mode 100644 index 0000000..801ab36 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.jdbc.BacklogMetricStore; +import org.apache.falcon.metrics.MetricNotificationService; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.util.MetricInfo; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.WorkflowExecutionListener; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine; + +/** + * Backlog Metric Emitter Service to publish metrics to Graphite. + */ +public final class BacklogMetricEmitterService implements FalconService, + EntitySLAListener, WorkflowExecutionListener { + + private static final String METRIC_PREFIX = "falcon"; + private static final String METRIC_SEPARATOR = "."; + private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs"; + private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice." + + "recheck.interval.millisecs"; + private static final String DEFAULT_PIPELINE = "DEFAULT"; + + private static final Logger LOG = LoggerFactory.getLogger(BacklogMetricEmitterService.class); + + private static BacklogMetricStore backlogMetricStore = new BacklogMetricStore(); + + private static final BacklogMetricEmitterService SERVICE = new BacklogMetricEmitterService(); + + private static MetricNotificationService metricNotificationService = + Services.get().getService(MetricNotificationService.SERVICE_NAME); + + public static BacklogMetricEmitterService get() { + return SERVICE; + } + + private BacklogMetricEmitterService() { + } + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1); + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1); + + + public static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm'Z'"); + format.setTimeZone(TimeZone.getTimeZone("UTC")); + return format; + } + }; + + private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>(); + + @Override + public void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime) + throws FalconException { + + if (entityType != EntityType.PROCESS) { + return; + } + Entity entity = EntityUtil.getEntity(entityType, entityName); + entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>())); + List<MetricInfo> metricInfoList = entityBacklogs.get(entity); + String nominalTimeStr = DATE_FORMAT.get().format(nominalTime); + MetricInfo metricInfo = new MetricInfo(nominalTimeStr, clusterName); + if (!metricInfoList.contains(metricInfo)) { + synchronized (metricInfoList) { + backlogMetricStore.addInstance(entityName, clusterName, nominalTime, entityType); + metricInfoList.add(metricInfo); + } + } + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public void init() throws FalconException { + initInstances(); + int emitInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_EMIT_INTERVAL, + "60000")); + int recheckInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_RECHECK_INTERVAL, + "60000")); + scheduledThreadPoolExecutor1.scheduleAtFixedRate(new BacklogMetricEmitter(), + 1, emitInterval, TimeUnit.MILLISECONDS); + scheduledThreadPoolExecutor2.scheduleAtFixedRate(new BacklogCheckService(), + 1, recheckInterval, TimeUnit.MILLISECONDS); + } + + private void initInstances() throws FalconException { + LOG.info("Initializing backlog instances from state store"); + Map<Entity, List<MetricInfo>> backlogInstances = backlogMetricStore.getAllInstances(); + if (backlogInstances != null && !backlogInstances.isEmpty()) { + for (Map.Entry<Entity, List<MetricInfo>> entry : backlogInstances.entrySet()) { + List<MetricInfo> metricsInDB = entry.getValue(); + List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB); + entityBacklogs.put(entry.getKey(), metricInfoList); + LOG.debug("Backlog of entity " + entry.getKey().getName() + " for instances " + metricInfoList); + } + } + } + + @Override + public void destroy() throws FalconException { + scheduledThreadPoolExecutor1.shutdown(); + scheduledThreadPoolExecutor2.shutdown(); + } + + @Override + public synchronized void onSuccess(WorkflowExecutionContext context) throws FalconException { + Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName()); + if (entity.getEntityType() != EntityType.PROCESS) { + return; + } + if (entityBacklogs.containsKey(entity)) { + List<MetricInfo> metrics = entityBacklogs.get(entity); + synchronized (metrics) { + Date date = SchemaHelper.parseDateUTC(context.getNominalTimeAsISO8601()); + backlogMetricStore.deleteMetricInstance(entity.getName(), context.getClusterName(), + date, entity.getEntityType()); + metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName())); + if (metrics.isEmpty()) { + entityBacklogs.remove(entity); + } + } + } + } + + @Override + public void onFailure(WorkflowExecutionContext context) throws FalconException { + // Do Nothing + } + + @Override + public void onStart(WorkflowExecutionContext context) throws FalconException { + // Do Nothing + } + + @Override + public void onSuspend(WorkflowExecutionContext context) throws FalconException { + // Do Nothing + } + + @Override + public void onWait(WorkflowExecutionContext context) throws FalconException { + // Do Nothing + } + + /** + * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities. + */ + public static class BacklogMetricEmitter implements Runnable { + private ThreadPoolExecutor executor; + + @Override + public void run() { + LOG.debug("BacklogMetricEmitter running for entities"); + executor = new ScheduledThreadPoolExecutor(10); + List<Future> futures = new ArrayList<>(); + try { + for (Entity entity : entityBacklogs.keySet()) { + futures.add(executor.submit(new BacklogCalcService(entity, entityBacklogs.get(entity)))); + } + waitForFuturesToComplete(futures); + } finally { + executor.shutdown(); + } + } + + private void waitForFuturesToComplete(List<Future> futures) { + try { + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException e) { + LOG.error("Interruption while executing tasks " + e); + } catch (ExecutionException e) { + LOG.error("Error in executing threads " + e); + } + } + } + + /** + * Service which calculates backlog for given entity and publish to graphite. + */ + public static class BacklogCalcService implements Runnable { + + private Entity entityObj; + private List<MetricInfo> metrics; + + BacklogCalcService(Entity entity, List<MetricInfo> metricInfoList) { + this.entityObj = entity; + this.metrics = metricInfoList; + } + + @Override + public void run() { + + MetricInfo metricInfo = null; + HashMap<String, Long> backLogsCluster = new HashMap<>(); + synchronized (metrics) { + long currentTime = System.currentTimeMillis(); + Iterator iter = metrics.iterator(); + while (iter.hasNext()) { + try { + metricInfo = (MetricInfo) iter.next(); + long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime(); + long backlog = backLogsCluster.containsKey(metricInfo.getCluster()) + ? backLogsCluster.get(metricInfo.getCluster()) : 0; + backlog += (currentTime - time); + backLogsCluster.put(metricInfo.getCluster(), backlog); + } catch (ParseException e) { + LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime()); + } + } + + } + org.apache.falcon.entity.v0.process.Process process = (Process) entityObj; + + if (backLogsCluster != null && !backLogsCluster.isEmpty()) { + for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) { + String clusterName = entry.getKey(); + String pipelinesStr = process.getPipelines(); + String metricName; + Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes + if (pipelinesStr != null && !pipelinesStr.isEmpty()) { + String[] pipelines = pipelinesStr.split(","); + for (String pipeline : pipelines) { + metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR + + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name() + + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR + + "backlogInMins"; + metricNotificationService.publish(metricName, backlog); + } + } else { + metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR + + DEFAULT_PIPELINE + METRIC_SEPARATOR + LifeCycle.EXECUTION.name() + + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR + + "backlogInMins"; + metricNotificationService.publish(metricName, backlog); + } + } + } + } + } + + + /** + * Service runs periodically and removes succeeded instances from backlog list. + */ + public static class BacklogCheckService implements Runnable { + + @Override + public void run() { + LOG.debug("BacklogCheckService running for entities"); + try { + AbstractWorkflowEngine wfEngine = getWorkflowEngine(); + for (Entity entity : entityBacklogs.keySet()) { + List<MetricInfo> metrics = entityBacklogs.get(entity); + if (!metrics.isEmpty()) { + synchronized (metrics) { + Iterator iterator = metrics.iterator(); + while (iterator.hasNext()) { + MetricInfo metricInfo = (MetricInfo) iterator.next(); + String nominalTimeStr = metricInfo.getNominalTime(); + Date nominalTime; + try { + nominalTime = DATE_FORMAT.get().parse(nominalTimeStr); + if (entity.getACL().getOwner() != null && !entity.getACL().getOwner().isEmpty()) { + CurrentUser.authenticate(entity.getACL().getOwner()); + } else { + CurrentUser.authenticate(System.getProperty("user.name")); + } + if (wfEngine.isMissing(entity)) { + LOG.info("Entity of name {} was deleted so removing instance of " + + "nominaltime {} ", entity.getName(), nominalTimeStr); + backlogMetricStore.deleteMetricInstance(entity.getName(), + metricInfo.getCluster(), nominalTime, entity.getEntityType()); + iterator.remove(); + continue; + } + InstancesResult status = wfEngine.getStatus(entity, nominalTime, + nominalTime, null, null); + if (status.getInstances().length > 0 + && status.getInstances()[0].status == InstancesResult. + WorkflowStatus.SUCCEEDED) { + LOG.debug("Instance of nominaltime {} of entity {} was succeeded, removing " + + "from backlog entries", nominalTimeStr, entity.getName()); + backlogMetricStore.deleteMetricInstance(entity.getName(), + metricInfo.getCluster(), nominalTime, entity.getEntityType()); + iterator.remove(); + } + } catch (ParseException e) { + LOG.error("Unable to parse date " + nominalTimeStr); + } + } + } + } + } + } catch (Throwable e) { + LOG.error("Error while checking backlog metrics" + e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index 57e46b7..a7cafeb 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -150,7 +150,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList LOG.info("Entity :"+ entityName + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType + "missed SLAHigh"); - highSLAMissed(entityName, clusterName, entityType, nominalTime); + highSLAMissed(entityName, clusterName, EntityType.valueOf(entityType), nominalTime); } } } catch (FalconException e){ @@ -160,12 +160,12 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList } @Override - public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime + public void highSLAMissed(String entityName, String clusterName, EntityType entityType , Date nominalTime ) throws FalconException { LOG.debug("Listners called..."); for (EntitySLAListener listener : listeners) { listener.highSLAMissed(entityName, clusterName, entityType, nominalTime); - store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType); + store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name()); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java index 421ea38..73d383b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java @@ -18,6 +18,7 @@ package org.apache.falcon.service; import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.EntityType; import java.util.Date; @@ -25,6 +26,6 @@ import java.util.Date; * Interface for FeedSLAAlert to be used by Listeners. */ public interface EntitySLAListener { - void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime) + void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/util/MetricInfo.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java new file mode 100644 index 0000000..694bb87 --- /dev/null +++ b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.util; + +/** + * Storage for Backlog Metrics. + */ +public class MetricInfo { + + private String nominalTime; + private String cluster; + + public MetricInfo(String nominalTimeStr, String clusterName) { + this.nominalTime = nominalTimeStr; + this.cluster = clusterName; + } + + public String getNominalTime() { + return nominalTime; + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || !o.getClass().equals(this.getClass())) { + return false; + } + + MetricInfo other = (MetricInfo) o; + + boolean nominalTimeEqual = this.getNominalTime() != null + ? this.getNominalTime().equals(other.getNominalTime()) : other.getNominalTime() == null; + + boolean clusterEqual = this.getCluster() != null + ? this.getCluster().equals(other.getCluster()) : other.getCluster() == null; + + return this == other + || (nominalTimeEqual && clusterEqual); + } + + @Override + public int hashCode() { + int result = nominalTime != null ? nominalTime.hashCode() : 0; + result = 31 * result + (cluster != null ? cluster.hashCode() : 0); + return result; + } + + public String toString() { + return "Nominaltime: " + this.getNominalTime() + " cluster: " + this.getCluster(); + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java new file mode 100644 index 0000000..67d256e --- /dev/null +++ b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.service; + +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.jdbc.BacklogMetricStore; +import org.apache.falcon.metrics.MetricNotificationService; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StateStoreProperties; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Test cases for Backlog Metric Store. + */ +public class BacklogMetricEmitterServiceTest extends AbstractTestBase{ + private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + private static BacklogMetricStore backlogMetricStore; + private static FalconJPAService falconJPAService = FalconJPAService.get(); + private static BacklogMetricEmitterService backlogMetricEmitterService; + private MetricNotificationService mockMetricNotificationService; + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } + + public void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + @AfterClass + public void cleanup() throws IOException { + cleanupDB(); + } + + private void cleanupDB() throws IOException { + fs.delete(new Path(DB_BASE_DIR), true); + } + + @BeforeClass + public void setup() throws Exception{ + StateStoreProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + backlogMetricStore = new BacklogMetricStore(); + mockMetricNotificationService = Mockito.mock(MetricNotificationService.class); + Mockito.when(mockMetricNotificationService.getName()).thenReturn("MetricNotificationService"); + Services.get().register(mockMetricNotificationService); + Services.get().register(BacklogMetricEmitterService.get()); + backlogMetricEmitterService = BacklogMetricEmitterService.get(); + + } + + + @Test + public void testBacklogEmitter() throws Exception { + backlogMetricEmitterService.init(); + storeEntity(EntityType.PROCESS, "entity1"); + backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS, + BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z")); + Thread.sleep(10); + ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<Long> valueCaptor = ArgumentCaptor.forClass(Long.class); + Mockito.verify(mockMetricNotificationService, Mockito.atLeastOnce()).publish(captor.capture(), + valueCaptor.capture()); + Assert.assertEquals(captor.getValue(), "falcon.cluster1.testPipeline.EXECUTION.entity1.backlogInMins"); + WorkflowExecutionContext workflowExecutionContext = getWorkflowExecutionContext(); + backlogMetricEmitterService.onSuccess(workflowExecutionContext); + Thread.sleep(100); + Mockito.reset(mockMetricNotificationService); + Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class), + Mockito.any(Long.class)); + + } + + private WorkflowExecutionContext getWorkflowExecutionContext() { + Map<WorkflowExecutionArgs, String> args = new HashMap<>(); + args.put(WorkflowExecutionArgs.ENTITY_TYPE, "process"); + args.put(WorkflowExecutionArgs.CLUSTER_NAME, "cluster1"); + args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1"); + args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00"); + args.put(WorkflowExecutionArgs.OPERATION, "GENERATE"); + WorkflowExecutionContext workflowExecutionContext = new WorkflowExecutionContext(args); + return workflowExecutionContext; + + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties new file mode 100644 index 0000000..d72dbba --- /dev/null +++ b/prism/src/test/resources/startup.properties @@ -0,0 +1,338 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +*.domain=debug + +######### Implementation classes ######### +## DONT MODIFY UNLESS SURE ABOUT CHANGE ## + +*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine +*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory +*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder +*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder +*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager +*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService +*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager +*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService + +##### Falcon Services ##### +*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ + org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ + org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.extensions.ExtensionService,\ + org.apache.falcon.service.EntitySLAMonitoringService,\ + org.apache.falcon.service.LifecyclePolicyMap,\ + org.apache.falcon.entity.store.ConfigurationStore,\ + org.apache.falcon.rerun.service.RetryService,\ + org.apache.falcon.rerun.service.LateRunService,\ + org.apache.falcon.metadata.MetadataMappingService,\ + org.apache.falcon.service.LogCleanupService,\ + org.apache.falcon.service.GroupsService,\ + org.apache.falcon.service.ProxyUserService,\ + org.apache.falcon.service.FalconJPAService +##Add if you want to send data to graphite +# org.apache.falcon.metrics.MetricNotificationService\ +## Add if you want to use Falcon Azure integration ## +# org.apache.falcon.adfservice.ADFProviderService +## If you wish to use Falcon native scheduler add the commented out services below to application.services ## +# org.apache.falcon.notification.service.impl.JobCompletionService,\ +# org.apache.falcon.notification.service.impl.SchedulerService,\ +# org.apache.falcon.notification.service.impl.AlarmService,\ +# org.apache.falcon.notification.service.impl.DataAvailabilityService,\ +# org.apache.falcon.execution.FalconExecutionService,\ + + + +# List of Lifecycle policies configured. +*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete +# List of builders for the policies. +*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder +##### Falcon Configuration Store Change listeners ##### +*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ + org.apache.falcon.entity.ColoClusterRelation,\ + org.apache.falcon.group.FeedGroupMap,\ + org.apache.falcon.entity.store.FeedLocationStore,\ + org.apache.falcon.service.EntitySLAMonitoringService,\ + org.apache.falcon.service.SharedLibraryHostingService +## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## +# org.apache.falcon.state.store.jdbc.JdbcStateStore + +## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here +*.feedAlert.listeners= + +##### JMS MQ Broker Implementation class ##### +*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory + +##### List of shared libraries for Falcon workflows ##### +*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el + +##### Workflow Job Execution Completion listeners ##### +*.workflow.execution.listeners= + +######### Implementation classes ######### + + +######### System startup parameters ######### + +# Location of libraries that is shipped to Hadoop +*.system.lib.location=${FALCON_HOME}/sharedlibs + +# Location to store user entity configurations + +#Configurations used in UTs +debug.config.store.uri=file://${user.dir}/target/store +#Location to store state of Feed SLA monitoring service +debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances +debug.config.oozie.conf.uri=${user.dir}/target/oozie +debug.system.lib.location=${system.lib.location} +debug.broker.url=vm://localhost +debug.retry.recorder.path=${user.dir}/target/retry +debug.libext.feed.retention.paths=${falcon.libext} +debug.libext.feed.replication.paths=${falcon.libext} +debug.libext.process.paths=${falcon.libext} + +debug.extension.store.uri=file://${user.dir}/target/extension/store + +#Configurations used in ITs +it.config.store.uri=file://${user.dir}/target/store +it.config.oozie.conf.uri=${user.dir}/target/oozie +it.system.lib.location=${system.lib.location} +it.broker.url=tcp://localhost:61616 +it.retry.recorder.path=${user.dir}/target/retry +it.libext.feed.retention.paths=${falcon.libext} +it.libext.feed.replication.paths=${falcon.libext} +it.libext.process.paths=${falcon.libext} +it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler + +*.falcon.cleanup.service.frequency=minutes(5) + +######### Properties for Feed SLA Monitoring ######### +# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour +*.feed.sla.serialization.frequency.millis=3600000 + +# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in +# a FIFO fashion. +*.feed.sla.queue.size=288 + +# Do not change unless really sure +# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 +*.feed.sla.statusCheck.frequency.seconds=600 + +# Do not change unless really sure +# Time Duration (in milliseconds) in future for generating pending feed instances. +# In every cycle pending feed instances are added for monitoring, till this time in future. +# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 +*.feed.sla.lookAheadWindow.millis=900000 + + +######### Properties for configuring JMS provider - activemq ######### +# Default Active MQ url +*.broker.url=tcp://localhost:61616 + +# default time-to-live for a JMS message 3 days (time in minutes) +*.broker.ttlInMins=4320 +*.entity.topic=FALCON.ENTITY.TOPIC +*.max.retry.failure.count=1 +*.retry.recorder.path=${user.dir}/logs/retry + +######### Properties for configuring iMon client and metric ######### +*.internal.queue.size=1000 + + +######### Graph Database Properties ######### +# Graph implementation +*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory + +# Graph Storage +# IMPORTANT: Please enable one of the graph db backend: hbase or berkeleydb, per instructions below. + +# Enable the following for Berkeley DB. Make sure je-5.0.73.jar is downloaded and available +# under Falcon webapp directory or under falcon server classpath. +#*.falcon.graph.storage.backend=berkeleyje +#*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb +#*.falcon.graph.serialize.path=${user.dir}/target/graphdb + +# Enable the following for HBase +#*.falcon.graph.storage.backend=hbase +# For standalone mode , set hostname to localhost; for distributed mode, set to the zookeeper quorum +# @see http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2 +#*.falcon.graph.storage.hostname=localhost +#*.falcon.graph.storage.hbase.table=falcon_titan + +# Avoid acquiring read lock when iterating over large graphs +# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html +*.falcon.graph.storage.transactions=false + +# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You +# can use other reporters like ganglia also. +# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the +# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph." +# *.falcon.graph.storage.enable-basic-metrics = true +# Required; IP or hostname string +# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1 +# Required; specify logging interval in milliseconds +# *.falcon.graph.metrics.graphite.interval = 60000 + +######### Authentication Properties ######### + +# Authentication type must be specified: simple|kerberos +*.falcon.authentication.type=simple + +##### Service Configuration + +# Indicates the Kerberos principal to be used in Falcon Service. +*.falcon.service.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the Service principal. +*.falcon.service.authentication.kerberos.keytab= + +# name node principal to talk to config store +*.dfs.namenode.kerberos.principal= + +##### SPNEGO Configuration + +# Authentication type must be specified: simple|kerberos|<class> +# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility +*.falcon.http.authentication.type=simple + +# Indicates how long (in seconds) an authentication token is valid before it has to be renewed. +*.falcon.http.authentication.token.validity=36000 + +# The signature secret for signing the authentication tokens. +*.falcon.http.authentication.signature.secret=falcon + +# The domain to use for the HTTP cookie that stores the authentication token. +*.falcon.http.authentication.cookie.domain= + +# Indicates if anonymous requests are allowed when using 'simple' authentication. +*.falcon.http.authentication.simple.anonymous.allowed=false + +# Indicates the Kerberos principal to be used for HTTP endpoint. +# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. +*.falcon.http.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the HTTP principal. +*.falcon.http.authentication.kerberos.keytab= + +# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details. +*.falcon.http.authentication.kerberos.name.rules=DEFAULT + +# Comma separated list of black listed users +*.falcon.http.authentication.blacklisted.users= + +######### Authentication Properties ######### + + +######### Authorization Properties ######### + +# Authorization Enabled flag: false (default)|true +*.falcon.security.authorization.enabled=false + +# The name of the group of super-users +*.falcon.security.authorization.superusergroup=falcon + +# Admin Users, comma separated users +*.falcon.security.authorization.admin.users=falcon,ambari-qa + +# Admin Group Membership, comma separated users +*.falcon.security.authorization.admin.groups=falcon,staff + +# Authorization Provider Implementation Fully Qualified Class Name +*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider + +######### Authorization Properties ######### + +######### ADF Configurations start ######### + +# A String object that represents the namespace +*.microsoft.windowsazure.services.servicebus.namespace= + +# Request and status queues on the namespace +*.microsoft.windowsazure.services.servicebus.requestqueuename= +*.microsoft.windowsazure.services.servicebus.statusqueuename= + +# A String object that contains the SAS key name +*.microsoft.windowsazure.services.servicebus.sasKeyName= + +# A String object that contains the SAS key +*.microsoft.windowsazure.services.servicebus.sasKey= + +# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect +# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net" +*.microsoft.windowsazure.services.servicebus.serviceBusRootUri= + +# Service bus polling frequency +*.microsoft.windowsazure.services.servicebus.polling.frequency= + +# Super user +*.microsoft.windowsazure.services.servicebus.superuser= + +######### ADF Configurations end ########### + +######### SMTP Properties ######## + +# Setting SMTP hostname +#*.falcon.email.smtp.host=localhost + +# Setting SMTP port number +#*.falcon.email.smtp.port=25 + +# Setting email from address +#*.falcon.email.from.address=falcon@localhost + +# Setting email Auth +#*.falcon.email.smtp.auth=false + +#Setting user name +#*.falcon.email.smtp.user="" + +#Setting password +#*.falcon.email.smtp.password="" + +# Setting monitoring plugin, if SMTP parameters is defined +#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ +# org.apache.falcon.plugin.EmailNotificationPlugin + +######### StateStore Properties ##### +#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore +#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver +#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true +#*.falcon.statestore.jdbc.username=sa +#*.falcon.statestore.jdbc.password= +#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource +## Maximum number of active connections that can be allocated from this pool at the same time. +#*.falcon.statestore.pool.max.active.conn=10 +#*.falcon.statestore.connection.properties= +## Indicates the interval (in milliseconds) between eviction runs. +#*.falcon.statestore.validate.db.connection.eviction.interval=300000 +## The number of objects to examine during each run of the idle object evictor thread. +#*.falcon.statestore.validate.db.connection.eviction.num=10 +## Creates Falcon DB. +## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. +## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. +#*.falcon.statestore.create.db.schema=true + +# Graphite properties +*.falcon.graphite.hostname=localhost +*.falcon.graphite.port=2003 +*.falcon.graphite.frequency=1 +*.falcon.graphite.prefix=falcon + +# Backlog Metric Properties +*.falcon.backlog.metricservice.emit.interval.millisecs=10 +*.falcon.backlog.metricservice.recheck.interval.millisecs=1000 http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index 6dbec0c..7b7da0a 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -128,6 +128,11 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { } @Override + public boolean isMissing(Entity entity) throws FalconException { + return !STATE_STORE.entityExists(new EntityID(entity)); + } + + @Override public String suspend(Entity entity) throws FalconException { EXECUTION_SERVICE.suspend(entity); return "SUCCESS"; http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/build/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml index 5c35b8c..346583d 100644 --- a/src/build/findbugs-exclude.xml +++ b/src/build/findbugs-exclude.xml @@ -57,6 +57,12 @@ <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" /> </Match> + <Match> + <Class name="org.apache.falcon.persistence.BacklogMetricBean" /> + <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" /> + </Match> + + <Match> <Class name="org.apache.falcon.persistence.MonitoredEntityBean" /> http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index a107eca..ef07e57 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -58,6 +58,15 @@ ##Add if you want to send data to graphite # org.apache.falcon.metrics.MetricNotificationService\ + +##Add if you want to enable BacklogMetricService +# org.apache.falcon.service.FalconJPAService,\ +# org.apache.falcon.metrics.MetricNotificationService,\ +# org.apache.falcon.service.EntitySLAMonitoringService,\ +# org.apache.falcon.service.EntitySLAAlertService,\ +# org.apache.falcon.service.BacklogMetricEmitterService + + ## Add if you want to use Falcon Azure integration ## # org.apache.falcon.adfservice.ADFProviderService ## If you wish to use Falcon native scheduler uncomment out below application services and comment out above application services ## @@ -160,6 +169,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ # It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 *.feed.sla.lookAheadWindow.millis=900000 +##Add if you want to enable BacklogMetricService +#*.feedAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService + ######### Properties for configuring JMS provider - activemq ######### # Default Active MQ url *.broker.url=tcp://localhost:61616 @@ -337,3 +349,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ #*.falcon.graphite.port=2003 #*.falcon.graphite.frequency=1 #*.falcon.graphite.prefix=falcon + +# Backlog Metric Properties +#*.falcon.backlog.metricservice.emit.interval.millisecs=60000 +#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
