Repository: falcon Updated Branches: refs/heads/master 6c787783b -> f40926669
yes Add running, succeeded, failed, killed and suspended instances to titan DB based on JMS notifications on workflow jobs. Instance-entity edge properties (e.g. nominal time, status) are also added for vertex-centric indexing. Author: yzheng-hortonworks <[email protected]> Reviewers: Balu Vellanki <[email protected]>, Venkatesan Ramachandran <[email protected]> Closes #47 from yzheng-hortonworks/FALCON-1111 and squashes the following commits: f2c9faf [yzheng-hortonworks] review by Balu ff9556b [yzheng-hortonworks] review from Venky b76f642 [yzheng-hortonworks] FALCON-1111 Instance update on titan DB based on JMS notifications on workflow jobs Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f4092666 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f4092666 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f4092666 Branch: refs/heads/master Commit: f40926669a68d7ad3d548d558e490df6d53a3e0b Parents: 6c78778 Author: yzheng-hortonworks <[email protected]> Authored: Fri Mar 4 17:18:38 2016 -0800 Committer: bvellanki <[email protected]> Committed: Fri Mar 4 17:18:38 2016 -0800 ---------------------------------------------------------------------- .../InstanceRelationshipGraphBuilder.java | 89 ++++++++++---------- .../falcon/metadata/MetadataMappingService.java | 83 ++++++++++-------- .../metadata/RelationshipGraphBuilder.java | 9 +- .../falcon/metadata/RelationshipProperty.java | 6 +- common/src/main/resources/startup.properties | 4 + src/conf/startup.properties | 4 + 6 files changed, 112 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index b709857..1d48f75 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.TimeZone; /** @@ -78,8 +80,9 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsLong()); addWorkflowInstanceProperties(processInstance, context); + Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); addInstanceToEntity(processInstance, context.getEntityName(), - RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE); + RelationshipType.PROCESS_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties); addInstanceToEntity(processInstance, context.getClusterName(), RelationshipType.CLUSTER_ENTITY, RelationshipLabel.PROCESS_CLUSTER_EDGE); addInstanceToEntity(processInstance, context.getWorkflowUser(), @@ -153,15 +156,14 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { public void addInstanceToEntity(Vertex instanceVertex, String entityName, RelationshipType entityType, RelationshipLabel edgeLabel, - String timestamp) { + Map<RelationshipProperty, String> properties) { Vertex entityVertex = findVertex(entityName, entityType); LOG.info("Vertex exists? name={}, type={}, v={}", entityName, entityType, entityVertex); if (entityVertex == null) { LOG.error("Illegal State: {} vertex must exist for {}", entityType, entityName); throw new IllegalStateException(entityType + " entity vertex must exist " + entityName); } - - addEdge(instanceVertex, entityVertex, edgeLabel.getName(), timestamp); + addEdge(instanceVertex, entityVertex, edgeLabel.getName(), properties); } public void addOutputFeedInstances(WorkflowExecutionContext context, @@ -215,19 +217,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { feedInstanceDataPath, targetClusterName); String feedInstanceName = getFeedInstanceName(feedName, targetClusterName, feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, context.getSrcClusterName()); - } + Vertex feedInstanceVertex = addFeedInstance( + feedInstanceName, context, feedName, context.getSrcClusterName()); + Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); + properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601()); addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601()); + RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, properties); addCounters(feedInstanceVertex, context); } @@ -250,20 +246,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { feedName, evictedFeedInstancePath, clusterName); String feedInstanceName = getFeedInstanceName(feedName, clusterName, evictedFeedInstancePath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, - RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, clusterName); - } + Vertex feedInstanceVertex = addFeedInstance( + feedInstanceName, context, feedName, clusterName); + Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); + properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601()); addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601()); + RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, properties); } } @@ -280,20 +269,15 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { feedInstanceDataPath, sourceClusterName, datasourceName); String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName, feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - LOG.info("Vertex exists? name={}, type={}, v={}", - feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex); - if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon - LOG.info("{} instance vertex {} does not exist, add it", - RelationshipType.FEED_INSTANCE, feedInstanceName); - feedInstanceVertex = addFeedInstance(// add a new instance - feedInstanceName, context, feedName, context.getSrcClusterName()); - } + Vertex feedInstanceVertex = addFeedInstance( + feedInstanceName, context, feedName, context.getSrcClusterName()); + + Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); + properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601()); addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY, - RelationshipLabel.DATASOURCE_IMPORT_EDGE, context.getTimeStampAsISO8601()); + RelationshipLabel.DATASOURCE_IMPORT_EDGE, properties); addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY, - RelationshipLabel.FEED_CLUSTER_EDGE, context.getTimeStampAsISO8601()); + RelationshipLabel.FEED_CLUSTER_EDGE, properties); } public String getImportInstanceName(WorkflowExecutionContext context) { @@ -308,18 +292,30 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { feedInstanceDataPath, clusterName); String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName); + Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName, false); addProcessFeedEdge(processInstance, feedInstance, edgeLabel); } private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context, String feedName, String clusterName) throws FalconException { + return addFeedInstance(feedInstanceName, context, feedName, clusterName, true); + } + + private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context, String feedName, + String clusterName, boolean hasEdgeProperties) throws FalconException { LOG.info("Adding feed instance {}", feedInstanceName); Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE, context.getTimeStampAsLong()); - - addInstanceToEntity(feedInstance, feedName, - RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE); + feedInstance.setProperty(RelationshipProperty.STATUS.getName(), context.getValue(WorkflowExecutionArgs.STATUS)); + + if (hasEdgeProperties) { + Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); + addInstanceToEntity(feedInstance, feedName, + RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE, properties); + } else { + addInstanceToEntity(feedInstance, feedName, + RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE); + } addInstanceToEntity(feedInstance, clusterName, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE); addInstanceToEntity(feedInstance, context.getWorkflowUser(), @@ -334,6 +330,13 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { return feedInstance; } + private Map<RelationshipProperty, String> edgePropertiesForIndexing(WorkflowExecutionContext context) { + Map<RelationshipProperty, String> properties = new HashMap<RelationshipProperty, String>(); + properties.put(RelationshipProperty.NOMINAL_TIME, context.getNominalTimeAsISO8601()); + properties.put(RelationshipProperty.STATUS, context.getValue(WorkflowExecutionArgs.STATUS)); + return properties; + } + public static String getFeedInstanceName(String feedName, String clusterName, String feedInstancePath, String nominalTime) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index cf2b651..7d22fd5 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -257,14 +257,42 @@ public class MetadataMappingService } @Override + public void onStart(final WorkflowExecutionContext context) throws FalconException { + LOG.info("onStart {}", context); + onInstanceExecutionUpdate(context); + } + + @Override public void onSuccess(final WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding lineage for context {}", context); + LOG.info("onSuccess {}", context); + onInstanceExecutionUpdate(context); + } + + @Override + public void onFailure(final WorkflowExecutionContext context) throws FalconException { + LOG.info("onFailure {}", context); + onInstanceExecutionUpdate(context); + } + + @Override + public void onSuspend(final WorkflowExecutionContext context) throws FalconException { + LOG.info("onSuspend {}", context); + onInstanceExecutionUpdate(context); + } + + @Override + public void onWait(final WorkflowExecutionContext context) throws FalconException { + LOG.info("onWait {}", context); + onInstanceExecutionUpdate(context); + } + + private void onInstanceExecutionUpdate(final WorkflowExecutionContext context) throws FalconException { try { new TransactionRetryHelper.Builder<Void>(getTransactionalGraph()) .perform(new TransactionWork<Void>() { @Override public Void execute(TransactionalGraph transactionalGraph) throws Exception { - onSuccessfulExecution(context); + updateInstanceStatus(context); transactionalGraph.commit(); return null; } @@ -275,64 +303,49 @@ public class MetadataMappingService } } - private void onSuccessfulExecution(final WorkflowExecutionContext context) throws FalconException { + private void updateInstanceStatus(final WorkflowExecutionContext context) throws FalconException { + if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) { + // TODO(yzheng): FALCON-1776 Instance update on titan DB based on JMS notifications on coordinator actions + return; + } + WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation(); switch (entityOperation) { case GENERATE: - onProcessInstanceExecuted(context); + updateProcessInstance(context); break; case REPLICATE: - onFeedInstanceReplicated(context); + updateReplicatedFeedInstance(context); break; case DELETE: - onFeedInstanceEvicted(context); + updateEvictedFeedInstance(context); break; case IMPORT: - onFeedInstanceImported(context); + updateImportedFeedInstance(context); break; default: throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation); } } - @Override - public void onFailure(WorkflowExecutionContext context) throws FalconException { - // do nothing since lineage is only recorded for successful workflow - } - - @Override - public void onStart(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onSuspend(WorkflowExecutionContext context) throws FalconException { - // Do nothing - } - - @Override - public void onWait(WorkflowExecutionContext context) throws FalconException { - // TBD - } - - - private void onProcessInstanceExecuted(WorkflowExecutionContext context) throws FalconException { + private void updateProcessInstance(WorkflowExecutionContext context) throws FalconException { + LOG.info("Updating process instance: {}", context.getNominalTimeAsISO8601()); Vertex processInstance = instanceGraphBuilder.addProcessInstance(context); instanceGraphBuilder.addOutputFeedInstances(context, processInstance); instanceGraphBuilder.addInputFeedInstances(context, processInstance); } - private void onFeedInstanceReplicated(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding replicated feed instance: {}", context.getNominalTimeAsISO8601()); + private void updateReplicatedFeedInstance(WorkflowExecutionContext context) throws FalconException { + LOG.info("Updating replicated feed instance: {}", context.getNominalTimeAsISO8601()); instanceGraphBuilder.addReplicatedInstance(context); } - private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601()); + private void updateEvictedFeedInstance(WorkflowExecutionContext context) throws FalconException { + LOG.info("Updating evicted feed instance: {}", context.getNominalTimeAsISO8601()); instanceGraphBuilder.addEvictedInstance(context); } - private void onFeedInstanceImported(WorkflowExecutionContext context) throws FalconException { - LOG.info("Adding imported feed instance: {}", context.getNominalTimeAsISO8601()); + private void updateImportedFeedInstance(WorkflowExecutionContext context) throws FalconException { + LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601()); instanceGraphBuilder.addImportedInstance(context); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java index 0c3fcee..32caca3 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipGraphBuilder.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Iterator; +import java.util.Map; /** * Base class for Metadata relationship mapping helper. @@ -113,12 +114,14 @@ public abstract class RelationshipGraphBuilder { } protected Edge addEdge(Vertex fromVertex, Vertex toVertex, - String edgeLabel, String timestamp) { + String edgeLabel, Map<RelationshipProperty, String> properties) { Edge edge = findEdge(fromVertex, toVertex, edgeLabel); Edge edgeToVertex = edge != null ? edge : fromVertex.addEdge(edgeLabel, toVertex); - if (timestamp != null) { - edgeToVertex.setProperty(RelationshipProperty.TIMESTAMP.getName(), timestamp); + if (properties != null) { + for (Map.Entry<RelationshipProperty, String> property : properties.entrySet()) { + edgeToVertex.setProperty(property.getKey().getName(), property.getValue()); + } } return edgeToVertex; http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java index ff437d9..fd23d4f 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipProperty.java @@ -23,7 +23,7 @@ package org.apache.falcon.metadata; */ public enum RelationshipProperty { - // vertex property keys - indexed + // vertex/edge property keys - indexed NAME("name"), TYPE("type"), TIMESTAMP("timestamp"), @@ -39,8 +39,10 @@ public enum RelationshipProperty { RUN_ID("runId", "current run-id of the instance"), STATUS("status", "status of the user workflow instance"), WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex: oozie"), - USER_SUBFLOW_ID("subflowId", "external id of user workflow"); + USER_SUBFLOW_ID("subflowId", "external id of user workflow"), + // instance-entity edge property + NOMINAL_TIME("nominalTime"); private final String name; private final String description; http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index 2497cce..123d63c 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -156,6 +156,10 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle *.falcon.graph.transaction.retry.count=3 *.falcon.graph.transaction.retry.delay=5 +# Avoid acquiring read lock when iterating over large graphs +# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html +*.falcon.graph.storage.transactions=false + # Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You # can use other reporters like ganglia also. # Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the http://git-wip-us.apache.org/repos/asf/falcon/blob/f4092666/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index faaedfa..51a791e 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -169,6 +169,10 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.falcon.graph.transaction.retry.count=3 *.falcon.graph.transaction.retry.delay=5 +# Avoid acquiring read lock when iterating over large graphs +# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html +*.falcon.graph.storage.transactions=false + # Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You # can use other reporters like ganglia also. # Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
