Repository: falcon Updated Branches: refs/heads/master 8736fcbe8 -> ade3079bf
FALCON-1838 Export instances are not added graph db for lineage tracking Author: Venkatesan Ramachandran <[email protected]> Reviewers: "Balu Vellanki <[email protected]>, Ying Zheng <[email protected]>" Closes #74 from vramachan/master Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ade3079b Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ade3079b Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ade3079b Branch: refs/heads/master Commit: ade3079bfe6b2b811048a0e43806000f68e17db7 Parents: 8736fcb Author: Venkatesan Ramachandran <[email protected]> Authored: Thu Mar 17 09:01:14 2016 -0700 Committer: bvellanki <[email protected]> Committed: Thu Mar 17 09:01:14 2016 -0700 ---------------------------------------------------------------------- .../InstanceRelationshipGraphBuilder.java | 26 ++++++++++++-------- .../falcon/metadata/MetadataMappingService.java | 7 ++++++ .../falcon/metadata/RelationshipLabel.java | 1 + 3 files changed, 24 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java index 1d48f75..f9cd2b9 100644 --- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java @@ -256,17 +256,27 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { } } - public void addImportedInstance(WorkflowExecutionContext context) throws FalconException { + addImportExportInstanceHelper(context, RelationshipLabel.DATASOURCE_IMPORT_EDGE); + } - String feedName = context.getOutputFeedNames(); - String feedInstanceDataPath = context.getOutputFeedInstancePaths(); + public void addExportedInstance(WorkflowExecutionContext context) throws FalconException { + addImportExportInstanceHelper(context, RelationshipLabel.DATASOURCE_EXPORT_EDGE); + } + + private void addImportExportInstanceHelper(WorkflowExecutionContext context, + RelationshipLabel label) throws FalconException { + String feedName = (label == RelationshipLabel.DATASOURCE_IMPORT_EDGE) + ? context.getOutputFeedNames() : context.getInputFeedNames(); + String feedInstanceDataPath = (label == RelationshipLabel.DATASOURCE_IMPORT_EDGE) + ? context.getOutputFeedInstancePaths() : context.getInputFeedInstancePaths(); String datasourceName = context.getDatasourceName(); String sourceClusterName = context.getSrcClusterName(); - LOG.info("Computing import feed instance for : name= {} path= {}, in cluster: {} " - + "from datasource: {}", feedName, + LOG.info("Computing {} feed instance for : name= {} path= {}, in cluster: {} " + + "from datasource: {}", label.getName(), feedName, feedInstanceDataPath, sourceClusterName, datasourceName); + String feedInstanceName = getFeedInstanceName(feedName, sourceClusterName, feedInstanceDataPath, context.getNominalTimeAsISO8601()); Vertex feedInstanceVertex = addFeedInstance( @@ -275,15 +285,11 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder { Map<RelationshipProperty, String> properties = edgePropertiesForIndexing(context); properties.put(RelationshipProperty.TIMESTAMP, context.getTimeStampAsISO8601()); addInstanceToEntity(feedInstanceVertex, datasourceName, RelationshipType.DATASOURCE_ENTITY, - RelationshipLabel.DATASOURCE_IMPORT_EDGE, properties); + label, properties); addInstanceToEntity(feedInstanceVertex, sourceClusterName, RelationshipType.CLUSTER_ENTITY, RelationshipLabel.FEED_CLUSTER_EDGE, properties); } - public String getImportInstanceName(WorkflowExecutionContext context) { - return context.getEntityName() + "/" + context.getNominalTimeAsISO8601(); - } - private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel, WorkflowExecutionContext context, String feedName, String feedInstanceDataPath) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java index 9f4920c..66a3a58 100644 --- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java +++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java @@ -345,6 +345,9 @@ public class MetadataMappingService case IMPORT: updateImportedFeedInstance(context); break; + case EXPORT: + updateExportedFeedInstance(context); + break; default: throw new IllegalArgumentException("Invalid EntityOperation - " + entityOperation); } @@ -370,4 +373,8 @@ public class MetadataMappingService LOG.info("Updating imported feed instance: {}", context.getNominalTimeAsISO8601()); instanceGraphBuilder.addImportedInstance(context); } + private void updateExportedFeedInstance(WorkflowExecutionContext context) throws FalconException { + LOG.info("Updating export feed instance: {}", context.getNominalTimeAsISO8601()); + instanceGraphBuilder.addExportedInstance(context); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/ade3079b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java index 6d4bf46..a146957 100644 --- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java +++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java @@ -29,6 +29,7 @@ public enum RelationshipLabel { FEED_PROCESS_EDGE("input"), PROCESS_FEED_EDGE("output"), DATASOURCE_IMPORT_EDGE("import"), + DATASOURCE_EXPORT_EDGE("export"), // instance edge labels INSTANCE_ENTITY_EDGE("instance-of"),
