FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/45a7b989 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/45a7b989 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/45a7b989 Branch: refs/heads/master Commit: 45a7b989bfdad6943dc0090c7cea2e098862c9a9 Parents: 06ffdf9 Author: srikanth.sundarrajan <srik...@apache.org> Authored: Fri Dec 26 10:35:32 2014 +0530 Committer: srikanth.sundarrajan <srik...@apache.org> Committed: Fri Dec 26 10:35:32 2014 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../apache/falcon/cli/FalconMetadataCLI.java | 17 +- .../org/apache/falcon/client/FalconClient.java | 17 +- .../falcon/resource/LineageGraphResult.java | 165 +++++++++++++++++++ docs/src/site/twiki/FalconCLI.twiki | 17 ++ docs/src/site/twiki/restapi/EntityLineage.twiki | 39 +++++ docs/src/site/twiki/restapi/ResourceList.twiki | 1 + pom.xml | 6 + prism/pom.xml | 7 + .../metadata/LineageMetadataResource.java | 102 ++++++++++++ .../metadata/LineageMetadataResourceTest.java | 8 + .../resource/metadata/MetadataTestContext.java | 18 ++ .../java/org/apache/falcon/cli/FalconCLIIT.java | 24 +++ 13 files changed, 422 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index af4bd9e..5575219 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,9 @@ Trunk (Unreleased) NEW FEATURES IMPROVEMENTS + FALCON-256 Create new API for Process dependency graph DAG which captures + process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan) + FALCON-823 Add path matching ability to the radix tree (Ajay Yadav via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java index 63af415..515d328 100644 --- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java +++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java @@ -54,6 +54,8 @@ public class FalconMetadataCLI { public static final String VERTEX_CMD = "vertex"; public static final String VERTICES_CMD = "vertices"; public static final String VERTEX_EDGES_CMD = "edges"; + public static final String PIPELINE_OPT = "pipeline"; + public static final String EDGE_CMD = "edge"; public static final String ID_OPT = "id"; @@ -78,8 +80,12 @@ public class FalconMetadataCLI { String key = commandLine.getOptionValue(KEY_OPT); String value = commandLine.getOptionValue(VALUE_OPT); String direction = commandLine.getOptionValue(DIRECTION_OPT); + String pipeline = commandLine.getOptionValue(PIPELINE_OPT); - if (optionsList.contains(LIST_OPT)) { + if (optionsList.contains(LINEAGE_OPT)) { + validatePipelineName(pipeline); + result = client.getEntityLineageGraph(pipeline).getDotNotation(); + } else if (optionsList.contains(LIST_OPT)) { validateDimensionType(dimensionType.toUpperCase()); result = client.getDimensionList(dimensionType, cluster); } else if (optionsList.contains(RELATIONS_OPT)) { @@ -105,6 +111,12 @@ public class FalconMetadataCLI { OUT.get().println(result); } + private void validatePipelineName(String pipeline) throws FalconCLIException { + if (StringUtils.isEmpty(pipeline)) { + throw new FalconCLIException("Invalid value for pipeline"); + } + } + private void validateDimensionType(String dimensionType) throws FalconCLIException { if (StringUtils.isEmpty(dimensionType) || dimensionType.contains("INSTANCE")) { @@ -157,6 +169,8 @@ public class FalconMetadataCLI { Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata lineage information"); group.addOption(discovery); group.addOption(lineage); + Option pipeline = new Option(PIPELINE_OPT, true, + "Get lineage graph for the entities in a pipeline"); metadataOptions.addOptionGroup(group); // Add discovery options @@ -172,6 +186,7 @@ public class FalconMetadataCLI { Option cluster = new Option(CLUSTER_OPT, true, "Cluster name"); // Add lineage options + metadataOptions.addOption(pipeline); metadataOptions.addOption(url); metadataOptions.addOption(type); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 5c476ae..a748c58 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -38,6 +38,7 @@ import org.apache.falcon.resource.EntitySummaryResult; import org.apache.falcon.resource.FeedInstanceResult; import org.apache.falcon.resource.InstancesResult; import org.apache.falcon.resource.InstancesSummaryResult; +import org.apache.falcon.resource.LineageGraphResult; import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; @@ -210,7 +211,8 @@ public class FalconClient { LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON), RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON), VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON), - EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON); + EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON), + LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON); private String path; private String method; @@ -507,6 +509,19 @@ public class FalconClient { return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster); } + public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException { + MetadataOperations operation = MetadataOperations.LINEAGE; + WebResource resource = service.path(operation.path) + .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName); + + ClientResponse clientResponse = resource + .header("Cookie", AUTH_COOKIE_EQ + authenticationToken) + .accept(operation.mimeType).type(operation.mimeType) + .method(operation.method, ClientResponse.class); + checkIfSuccessful(clientResponse); + return clientResponse.getEntity(LineageGraphResult.class); + } + public String getDimensionRelations(String dimensionType, String dimensionName) throws FalconCLIException { return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java new file mode 100644 index 0000000..acf5d11 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.commons.lang.StringUtils; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * LineageGraphResult is the output returned by all the apis returning a DAG. + */ +@XmlRootElement(name = "result") +@XmlAccessorType (XmlAccessType.FIELD) +@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) +public class LineageGraphResult { + + private String[] vertices; + + @XmlElement(name="edges") + private Edge[] edges; + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(LineageGraphResult.class); + } catch (JAXBException e) { + throw new RuntimeException(e); + } + } + + public LineageGraphResult() { + // default constructor for JAXB + } + + /** + * A class to represent an edge in a DAG. + */ + @XmlRootElement(name = "edge") + @XmlAccessorType(XmlAccessType.FIELD) + public static class Edge { + @XmlElement + private String from; + @XmlElement + private String to; + @XmlElement + private String label; + + public Edge() { + + } + + public Edge(String from, String to, String label) { + this.from = from; + this.to = to; + this.label = label; + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } + + public String getTo() { + return to; + } + + public void setTo(String to) { + this.to = to; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + public String getDotNotation() { + StringBuilder result = new StringBuilder(); + if (StringUtils.isNotBlank(this.from) && StringUtils.isNotBlank(this.to) + && StringUtils.isNotBlank(this.label)) { + result.append("\"" + this.from +"\""); + result.append(" -> "); + result.append("\"" + this.to + "\""); + result.append(" [ label = \"" + this.label + "\" ] \n"); + } + return result.toString(); + } + + @Override + public String toString() { + return getDotNotation(); + } + + } + + + public String getDotNotation() { + StringBuilder result = new StringBuilder(); + result.append("digraph g{ \n"); + if (this.vertices != null) { + for (String v : this.vertices) { + result.append("\"" + v + "\""); + result.append("\n"); + } + } + + if (this.edges != null) { + for (Edge e : this.edges) { + result.append(e.getDotNotation()); + } + } + result.append("}\n"); + return result.toString(); + } + + public String[] getVertices() { + return vertices; + } + + public void setVertices(String[] vertices) { + this.vertices = vertices; + } + + public Edge[] getEdges() { + return edges; + } + + public void setEdges(Edge[] edges) { + this.edges = edges; + } + + + @Override + public String toString() { + return getDotNotation(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/FalconCLI.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki index d8199dd..d37cf8c 100644 --- a/docs/src/site/twiki/FalconCLI.twiki +++ b/docs/src/site/twiki/FalconCLI.twiki @@ -56,6 +56,9 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value <a href="./Restapi/EntityList.html">Optional params described here.</a> + + + ---+++Summary Summary of entities of a particular type and a cluster will be listed. Entity summary has N most recent instances of entity. @@ -255,6 +258,20 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params - ---++ Metadata Lineage Options +---+++Lineage + +Returns the relationship between processes and feeds in a given pipeline in <a href="http://www.graphviz.org/content/dot-language">dot</a> format. +You can use the output and view a graphical representation of DAG using an online graphviz viewer like <a href="http://graphviz-dev.appspot.com/">this</a>. + + +Usage: + +$FALCON_HOME/bin/falcon metadata -lineage -pipeline my-pipeline + +pipeline is a mandatory option. + + + ---+++ Vertex Get the vertex with the specified id. http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/EntityLineage.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/EntityLineage.twiki b/docs/src/site/twiki/restapi/EntityLineage.twiki new file mode 100644 index 0000000..ea747b1 --- /dev/null +++ b/docs/src/site/twiki/restapi/EntityLineage.twiki @@ -0,0 +1,39 @@ +---++ GET api/metadata/lineage/entities?pipeline=:pipeline + * <a href="#Description">Description</a> + * <a href="#Parameters">Parameters</a> + * <a href="#Results">Results</a> + * <a href="#Examples">Examples</a> + +---++ Description +It returns the graph depicting the relationship between the various processes and feeds in a given pipeline. + +---++ Parameters + * :pipeline is the name of the pipeline + +---++ Results +It returns a json graph + +---++ Examples +---+++ Rest Call +<verbatim> +GET http://localhost:15000/api/metadata/lineage/entities?pipeline=my-pipeline +</verbatim> +---+++ Result +<verbatim> +{ + "vertices": ["my-minutely-process", "my-hourly-process"], + "edges": + [ + { + "from" : "my-minutely-process", + "to" : "my-hourly-process", + "label" : "my-minutely-feed" + }, + { + "from" : "my-hourly-process", + "to" : "my-minutely-process", + "label" : "my-hourly-feedback" + } + ] +} +</verbatim> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/ResourceList.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki index a87818b..2368631 100644 --- a/docs/src/site/twiki/restapi/ResourceList.twiki +++ b/docs/src/site/twiki/restapi/ResourceList.twiki @@ -77,6 +77,7 @@ See also: [[../Security.twiki][Security in Falcon]] | GET | [[AdjacentVertices][api/metadata/lineage/vertices/:id/:direction]] | get the adjacent vertices or edges of the vertex with the specified direction | | GET | [[AllEdges][api/metadata/lineage/edges/all]] | get all edges | | GET | [[Edge][api/metadata/lineage/edges/:id]] | get the edge with the specified id | +| GET | [[EntityLineage][api/metadata/lineage/entities?pipeline=:name]] | Get lineage graph for processes and feeds in the specified pipeline | ---++ REST Call on Metadata Discovery Resource http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5a6c095..1b3a6c5 100644 --- a/pom.xml +++ b/pom.xml @@ -625,6 +625,12 @@ </dependency> <dependency> + <groupId>com.tinkerpop.gremlin</groupId> + <artifactId>gremlin-java</artifactId> + <version>2.6.0</version> + </dependency> + + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>3.0.3.RELEASE</version> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/pom.xml ---------------------------------------------------------------------- diff --git a/prism/pom.xml b/prism/pom.xml index 26e577d..43cc4b4 100644 --- a/prism/pom.xml +++ b/prism/pom.xml @@ -61,6 +61,13 @@ </dependency> <dependency> + <groupId>com.tinkerpop.gremlin</groupId> + <artifactId>gremlin-java</artifactId> + </dependency> + + + + <dependency> <groupId>org.apache.falcon</groupId> <artifactId>falcon-test-util</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java index 2404be4..0c6b2b6 100644 --- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java @@ -18,6 +18,7 @@ package org.apache.falcon.resource.metadata; +import com.google.common.collect.Sets; import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Element; @@ -25,12 +26,22 @@ import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.VertexQuery; import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode; import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility; +import com.tinkerpop.gremlin.java.GremlinPipeline; import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.metadata.GraphUtils; import org.apache.falcon.metadata.RelationshipLabel; import org.apache.falcon.metadata.RelationshipProperty; import org.apache.falcon.metadata.RelationshipType; +import org.apache.falcon.monitors.Dimension; +import org.apache.falcon.monitors.Monitored; +import org.apache.falcon.resource.LineageGraphResult; import org.apache.falcon.util.StartupProperties; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -48,7 +59,10 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Set; /** * Jersey Resource for lineage metadata operations. @@ -81,6 +95,37 @@ public class LineageMetadataResource extends AbstractMetadataResource { } } + + @GET + @Path("/entities") + @Produces({MediaType.APPLICATION_JSON}) + @Monitored(event = "entity-lineage") + public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline") final String pipeline) { + LOG.info("Get lineage Graph for pipeline:({})", pipeline); + + try { + Iterable<Vertex> processes; + if (StringUtils.isNotBlank(pipeline)) { + Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(), + pipeline); + if (!pipelineNode.iterator().hasNext()) { + throw FalconWebException.newException("No pipelines found for " + pipeline, + Response.Status.BAD_REQUEST); + } + Vertex v = pipelineNode.iterator().next(); // pipeline names are unique + processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName()) + .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName()); + return Response.ok(buildJSONGraph(processes)).build(); + } + throw FalconWebException.newException("Pipeline name can not be blank", + Response.Status.INTERNAL_SERVER_ERROR); + + } catch (Exception e) { + LOG.error("Error while fetching entity lineage: ", e); + throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + /** * Get all vertices. * @@ -392,6 +437,63 @@ public class LineageMetadataResource extends AbstractMetadataResource { return response; } + private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws FalconException { + LineageGraphResult result = new LineageGraphResult(); + + List<String> vertexArray = new LinkedList<String>(); + List<LineageGraphResult.Edge> edgeArray = new LinkedList<LineageGraphResult.Edge>(); + + Map<String, String> feedProducerMap = new HashMap<String, String>(); + Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>(); + + if (processes != null) { + for (Vertex process : processes) { + String processName = process.getProperty(RelationshipProperty.NAME.getName()); + vertexArray.add(processName); + Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName); + + if (producer != null) { + if (producer.getOutputs() != null) { + //put all produced feeds in feedProducerMap + for (Output output : producer.getOutputs().getOutputs()) { + feedProducerMap.put(output.getFeed(), processName); + } + } + if (producer.getInputs() != null) { + //put all consumed feeds in feedConsumerMap + for (Input input : producer.getInputs().getInputs()) { + //if feed already exists then append it, else insert it with a list + if (feedConsumerMap.containsKey(input.getFeed())) { + feedConsumerMap.get(input.getFeed()).add(processName); + } else { + List<String> value = new LinkedList<String>(); + value.add(processName); + feedConsumerMap.put(input.getFeed(), value); + } + } + } + } + } + LOG.debug("feedProducerMap = {}", feedProducerMap); + LOG.debug("feedConsumerMap = {}", feedConsumerMap); + + // discard feeds which aren't edges between two processes + Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(), feedConsumerMap.keySet()); + for (String feedName : pipelineFeeds) { + String producerProcess = feedProducerMap.get(feedName); + // make an edge from producer to all the consumers + for (String consumerProcess : feedConsumerMap.get(feedName)) { + edgeArray.add(new LineageGraphResult.Edge(producerProcess, consumerProcess, feedName)); + } + } + } + + result.setEdges(edgeArray.toArray(new LineageGraphResult.Edge[edgeArray.size()])); + result.setVertices(vertexArray.toArray(new String[vertexArray.size()])); + LOG.debug("result = {}", result); + return result; + } + private static void validateInputs(String errorMsg, String... inputs) { for (String input : inputs) { if (StringUtils.isEmpty(input)) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java index cabb44c..ac0e51f 100644 --- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java @@ -351,6 +351,14 @@ public class LineageMetadataResourceTest { } } + @Test + public void testEntityLineage() throws Exception { + testContext.addConsumerProcess(); + LineageMetadataResource resource = new LineageMetadataResource(); + Response response = resource.getEntityLineageGraph("testPipeline"); + Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); + } + private void assertBasicVertexProperties(Vertex vertex, Map vertexProperties) { RelationshipProperty[] properties = { RelationshipProperty.NAME, http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java index aaddf62..6f798a8 100644 --- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java +++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java @@ -51,6 +51,7 @@ public class MetadataTestContext { public static final String OPERATION = "GENERATE"; public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; + public static final String CHILD_PROCESS_ENTITY_NAME = "sample-child-process"; public static final String PROCESS_ENTITY_NAME = "sample-process"; public static final String COLO_NAME = "west-coast"; public static final String WORKFLOW_NAME = "imp-click-join-workflow"; @@ -171,6 +172,23 @@ public class MetadataTestContext { configStore.publish(EntityType.PROCESS, processEntity); } + public void addConsumerProcess() throws Exception { + org.apache.falcon.entity.v0.process.Process processEntity = + EntityBuilderTestUtil.buildProcess(CHILD_PROCESS_ENTITY_NAME, + clusterEntity, "classified-as=Critical", "testPipeline"); + EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION); + + for (Feed inputFeed : inputFeeds) { + EntityBuilderTestUtil.addOutput(processEntity, inputFeed); + } + + for (Feed outputFeed : outputFeeds) { + EntityBuilderTestUtil.addInput(processEntity, outputFeed); + } + + configStore.publish(EntityType.PROCESS, processEntity); + } + public void addInstance() throws Exception { WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(), WorkflowExecutionContext.Type.POST_PROCESSING); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java index 9c6ad80..b50999d 100644 --- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java @@ -487,6 +487,30 @@ public class FalconCLIIT { + " -file " + createTempJobPropertiesFile()), 0); } + + @Test + public void testEntityLineage() throws Exception { + TestContext context = new TestContext(); + Map<String, String> overlay = context.getUniqueOverlay(); + + String filePath; + filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay); + context.setCluster(overlay.get("cluster")); + Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0); + + Assert.assertEquals(executeWithURL("metadata -lineage -pipeline testPipeline"), 0); + + } + @Test public void testEntityPaginationFilterByCommands() throws Exception {