Repository: falcon Updated Branches: refs/heads/master 0a8d0cd3d -> a6107758f
FALCON-1212 Remove dependency on Gremlin. Contributed by Ajay Yadava Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/a6107758 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/a6107758 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/a6107758 Branch: refs/heads/master Commit: a6107758f125163c70f53570581e4dc5b9e15e87 Parents: 0a8d0cd Author: Suhas Vasu <[email protected]> Authored: Wed Jun 3 12:52:29 2015 +0530 Committer: Suhas Vasu <[email protected]> Committed: Wed Jun 3 12:52:29 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 6 -- prism/pom.xml | 7 -- .../metadata/LineageMetadataResource.java | 93 ++++++++++---------- 4 files changed, 50 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/a6107758/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 78cc5e4..4d02409 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (Unreleased) NEW FEATURES IMPROVEMENTS + FALCON-1212 Remove dependency on Gremlin (Ajay Yadava via Suhas Vasu) + FALCON-1211 Source tarball are not generated in mvn assembly when profile is distributed (Shaik Idris Ali) http://git-wip-us.apache.org/repos/asf/falcon/blob/a6107758/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 77d70f6..4d1dbb4 100644 --- a/pom.xml +++ b/pom.xml @@ -621,12 +621,6 @@ </dependency> <dependency> - <groupId>com.tinkerpop.gremlin</groupId> - <artifactId>gremlin-java</artifactId> - <version>2.6.0</version> - </dependency> - - <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>3.0.3.RELEASE</version> http://git-wip-us.apache.org/repos/asf/falcon/blob/a6107758/prism/pom.xml ---------------------------------------------------------------------- diff --git a/prism/pom.xml b/prism/pom.xml index af9b132..52b558d 100644 --- a/prism/pom.xml +++ b/prism/pom.xml @@ -61,13 +61,6 @@ </dependency> <dependency> - <groupId>com.tinkerpop.gremlin</groupId> - <artifactId>gremlin-java</artifactId> - </dependency> - - - - <dependency> <groupId>org.apache.falcon</groupId> <artifactId>falcon-test-util</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/falcon/blob/a6107758/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java index 8fcb50f..f8b503a 100644 --- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java @@ -26,10 +26,9 @@ import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.VertexQuery; import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility; -import com.tinkerpop.gremlin.java.GremlinPipeline; import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Input; @@ -58,6 +57,8 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -102,27 +103,34 @@ public class LineageMetadataResource extends AbstractMetadataResource { @Monitored(event = "entity-lineage") public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline") final String pipeline) { LOG.info("Get lineage Graph for pipeline:({})", pipeline); - - try { - Iterable<Vertex> processes; - if (StringUtils.isNotBlank(pipeline)) { - Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(), - pipeline); - if (!pipelineNode.iterator().hasNext()) { - throw FalconWebException.newException("No pipelines found for " + pipeline, - Response.Status.BAD_REQUEST); + List<Process> processes = new ArrayList<>(); + if (StringUtils.isNotBlank(pipeline)) { + try { + Collection<String> res = ConfigurationStore.get().getEntities(EntityType.PROCESS); + for (String processName : res) { + Process p = EntityUtil.getEntity(EntityType.PROCESS, processName); + String tags = p.getPipelines(); + if (StringUtils.isNotEmpty(tags)) { + for (String tag : tags.split(",")) { + if (StringUtils.equals(tag.trim(), pipeline.trim())) { + processes.add(p); + } + } + } } - Vertex v = pipelineNode.iterator().next(); // pipeline names are unique - processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName()) - .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName()); - return Response.ok(buildJSONGraph(processes)).build(); + } catch (Exception e) { + LOG.error("Error while fetching entity lineage: ", e); + throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR); } - throw FalconWebException.newException("Pipeline name can not be blank", - Response.Status.INTERNAL_SERVER_ERROR); - } catch (Exception e) { - LOG.error("Error while fetching entity lineage: ", e); - throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR); + if (processes.isEmpty()) { + throw FalconWebException.newException("No processes belonging to pipeline " + pipeline, + Response.Status.BAD_REQUEST); + } + return Response.ok(buildJSONGraph(processes)).build(); + } else { + throw FalconWebException.newException("Pipeline name can not be blank", + Response.Status.BAD_REQUEST); } } @@ -437,7 +445,7 @@ public class LineageMetadataResource extends AbstractMetadataResource { return response; } - private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws FalconException { + private LineageGraphResult buildJSONGraph(List<Process> processes) { LineageGraphResult result = new LineageGraphResult(); List<String> vertexArray = new LinkedList<String>(); @@ -446,37 +454,32 @@ public class LineageMetadataResource extends AbstractMetadataResource { Map<String, String> feedProducerMap = new HashMap<String, String>(); Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>(); - if (processes != null) { - for (Vertex process : processes) { - String processName = process.getProperty(RelationshipProperty.NAME.getName()); + if (processes != null && !processes.isEmpty()) { + for (Process producer : processes) { + String processName = producer.getName(); vertexArray.add(processName); - Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName); - - if (producer != null) { - if (producer.getOutputs() != null) { - //put all produced feeds in feedProducerMap - for (Output output : producer.getOutputs().getOutputs()) { - feedProducerMap.put(output.getFeed(), processName); - } + if (producer.getOutputs() != null) { + //put all produced feeds in feedProducerMap + for (Output output : producer.getOutputs().getOutputs()) { + feedProducerMap.put(output.getFeed(), processName); } - if (producer.getInputs() != null) { - //put all consumed feeds in feedConsumerMap - for (Input input : producer.getInputs().getInputs()) { - //if feed already exists then append it, else insert it with a list - if (feedConsumerMap.containsKey(input.getFeed())) { - feedConsumerMap.get(input.getFeed()).add(processName); - } else { - List<String> value = new LinkedList<String>(); - value.add(processName); - feedConsumerMap.put(input.getFeed(), value); - } + } + if (producer.getInputs() != null) { + //put all consumed feeds in feedConsumerMap + for (Input input : producer.getInputs().getInputs()) { + //if feed already exists then append it, else insert it with a list + if (feedConsumerMap.containsKey(input.getFeed())) { + feedConsumerMap.get(input.getFeed()).add(processName); + } else { + List<String> value = new LinkedList<String>(); + value.add(processName); + feedConsumerMap.put(input.getFeed(), value); } } } } - LOG.debug("feedProducerMap = {}", feedProducerMap); - LOG.debug("feedConsumerMap = {}", feedConsumerMap); + LOG.debug("feedProducerMap = {}", feedProducerMap); // discard feeds which aren't edges between two processes Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(), feedConsumerMap.keySet()); for (String feedName : pipelineFeeds) {
