FALCON-1101 Cluster submission in falcon does not create an owned-by edge. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/e093668a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/e093668a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/e093668a Branch: refs/heads/master Commit: e093668a832eb2b9696ab572529c37a55671a907 Parents: 7ffe1a3 Author: Ajay Yadava <[email protected]> Authored: Mon Jun 15 10:17:06 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Jun 15 10:17:06 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 4 +- .../EntityRelationshipGraphBuilder.java | 1 + .../EntityRelationshipGraphBuilder.java.orig | 480 ++++++++ .../metadata/MetadataMappingServiceTest.java | 160 ++- .../MetadataMappingServiceTest.java.orig | 1107 ++++++++++++++++++ .../metadata/MetadataDiscoveryResourceTest.java | 4 +- 6 files changed, 1701 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e0c4333..fbab615 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,7 +9,7 @@ Trunk (Unreleased) IMPROVEMENTS FALCON-1114 Oozie findBundles lists a directory and tries to match with the bundle's appPath (Pallavi Rao via Ajay Yadava) - + FALCON-1207 Falcon checkstyle allows wildcard imports(Pallavi Rao via Ajay Yadava) FALCON-1147 Allow _ in the names for name value pair(Sowmya Ramesh via Ajay Yadava) @@ -43,6 +43,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-1101 Cluster submission in falcon does not create an owned-by edge(Sowmya Ramesh via Ajay Yadava) + FALCON-1104 Exception while adding process instance to graphdb when feed has partition expression (Pavan Kumar Kolamuri via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java index 7ae7cd9..8c3876c 100644 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java @@ -73,6 +73,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { LOG.info("Adding cluster entity: {}", clusterEntity.getName()); Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); + addUserRelation(clusterVertex); addColoRelation(clusterEntity.getColo(), clusterVertex); addDataClassification(clusterEntity.getTags(), clusterVertex); } http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig new file mode 100644 index 0000000..7ae7cd9 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig @@ -0,0 +1,480 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.metadata; + +import com.tinkerpop.blueprints.Graph; +import com.tinkerpop.blueprints.Vertex; +import org.apache.falcon.entity.ProcessHelper; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.ClusterType; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Input; +import org.apache.falcon.entity.v0.process.Inputs; +import org.apache.falcon.entity.v0.process.Output; +import org.apache.falcon.entity.v0.process.Outputs; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Workflow; +import org.apache.falcon.workflow.WorkflowExecutionArgs; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Entity Metadata relationship mapping helper. + */ +public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class); + + + public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) { + super(graph, preserveHistory); + } + + public void addEntity(Entity entity) { + EntityType entityType = entity.getEntityType(); + switch (entityType) { + case CLUSTER: + addClusterEntity((Cluster) entity); + break; + case PROCESS: + addProcessEntity((Process) entity); + break; + case FEED: + addFeedEntity((Feed) entity); + break; + default: + throw new IllegalArgumentException("Invalid EntityType " + entityType); + } + } + + public void addClusterEntity(Cluster clusterEntity) { + LOG.info("Adding cluster entity: {}", clusterEntity.getName()); + Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); + + addColoRelation(clusterEntity.getColo(), clusterVertex); + addDataClassification(clusterEntity.getTags(), clusterVertex); + } + + public void addFeedEntity(Feed feed) { + LOG.info("Adding feed entity: {}", feed.getName()); + Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY); + + addUserRelation(feedVertex); + addDataClassification(feed.getTags(), feedVertex); + addGroups(feed.getGroups(), feedVertex); + + for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { + if (ClusterType.TARGET != feedCluster.getType()) { + addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE); + } + } + } + + public void updateEntity(Entity oldEntity, Entity newEntity) { + EntityType entityType = oldEntity.getEntityType(); + switch (entityType) { + case CLUSTER: + // a cluster cannot be updated + break; + case PROCESS: + updateProcessEntity((Process) oldEntity, (Process) newEntity); + break; + case FEED: + updateFeedEntity((Feed) oldEntity, (Feed) newEntity); + break; + default: + throw new IllegalArgumentException("Invalid EntityType " + entityType); + } + } + + + + public void updateFeedEntity(Feed oldFeed, Feed newFeed) { + LOG.info("Updating feed entity: {}", newFeed.getName()); + Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY); + if (feedEntityVertex == null) { + LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName()); + throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist."); + } + + updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex); + updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex); + updateFeedClusters(oldFeed.getClusters().getClusters(), + newFeed.getClusters().getClusters(), feedEntityVertex); + } + + public void addProcessEntity(Process process) { + String processName = process.getName(); + LOG.info("Adding process entity: {}", processName); + Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY); + addWorkflowProperties(process.getWorkflow(), processVertex, processName); + + addUserRelation(processVertex); + addDataClassification(process.getTags(), processVertex); + addPipelines(process.getPipelines(), processVertex); + + for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { + addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE); + } + + addInputFeeds(process.getInputs(), processVertex); + addOutputFeeds(process.getOutputs(), processVertex); + } + + public void updateProcessEntity(Process oldProcess, Process newProcess) { + LOG.info("Updating process entity: {}", newProcess.getName()); + Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY); + if (processEntityVertex == null) { + LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName()); + throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist"); + } + + updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(), + processEntityVertex, newProcess.getName()); + updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex); + updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex); + updateProcessClusters(oldProcess.getClusters().getClusters(), + newProcess.getClusters().getClusters(), processEntityVertex); + updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex); + updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex); + } + + public void addColoRelation(String colo, Vertex fromVertex) { + Vertex coloVertex = addVertex(colo, RelationshipType.COLO); + addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName()); + } + + public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) { + Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY); + if (clusterVertex == null) { // cluster must exist before adding other entities + LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName); + throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName); + } + + addEdge(fromVertex, clusterVertex, edgeLabel.getName()); + } + + public void addInputFeeds(Inputs inputs, Vertex processVertex) { + if (inputs == null) { + return; + } + + for (Input input : inputs.getInputs()) { + addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); + } + } + + public void addOutputFeeds(Outputs outputs, Vertex processVertex) { + if (outputs == null) { + return; + } + + for (Output output : outputs.getOutputs()) { + addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); + } + } + + public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { + Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); + if (feedVertex == null) { + LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); + throw new IllegalStateException("Feed entity vertex must exist: " + feedName); + } + + addProcessFeedEdge(processVertex, feedVertex, edgeLabel); + } + + public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) { + processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), + ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); + processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion()); + processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), + workflow.getEngine().value()); + } + + public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow, + Vertex processEntityVertex, String processName) { + if (areSame(oldWorkflow, newWorkflow)) { + return; + } + + LOG.info("Updating workflow properties for: {}", processEntityVertex); + addWorkflowProperties(newWorkflow, processEntityVertex, processName); + } + + public void updateDataClassification(String oldClassification, String newClassification, + Vertex entityVertex) { + if (areSame(oldClassification, newClassification)) { + return; + } + + removeDataClassification(oldClassification, entityVertex); + addDataClassification(newClassification, entityVertex); + } + + private void removeDataClassification(String classification, Vertex entityVertex) { + if (classification == null || classification.length() == 0) { + return; + } + + String[] oldTags = classification.split(","); + for (String oldTag : oldTags) { + int index = oldTag.indexOf("="); + String tagKey = oldTag.substring(0, index); + String tagValue = oldTag.substring(index + 1, oldTag.length()); + + removeEdge(entityVertex, tagValue, tagKey); + } + } + + public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) { + if (areSame(oldGroups, newGroups)) { + return; + } + + removeGroups(oldGroups, entityVertex); + addGroups(newGroups, entityVertex); + } + + public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) { + if (areSame(oldPipelines, newPipelines)) { + return; + } + + removePipelines(oldPipelines, entityVertex); + addPipelines(newPipelines, entityVertex); + } + + private void removeGroups(String groups, Vertex entityVertex) { + removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS); + } + + private void removePipelines(String pipelines, Vertex entityVertex) { + removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES); + } + + private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex, + RelationshipLabel edgeLabel) { + if (StringUtils.isEmpty(groupsOrPipelines)) { + return; + } + + String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(","); + for (String groupOrPipelineTag : oldGroupOrPipelinesTags) { + removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName()); + } + } + + public static boolean areSame(String oldValue, String newValue) { + return oldValue == null && newValue == null + || oldValue != null && newValue != null && oldValue.equals(newValue); + } + + public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, + List<org.apache.falcon.entity.v0.feed.Cluster> newClusters, + Vertex feedEntityVertex) { + if (areFeedClustersSame(oldClusters, newClusters)) { + return; + } + + // remove edges to old clusters + for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) { + if (ClusterType.TARGET != oldCuster.getType()) { + removeEdge(feedEntityVertex, oldCuster.getName(), + RelationshipLabel.FEED_CLUSTER_EDGE.getName()); + } + } + + // add edges to new clusters + for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) { + if (ClusterType.TARGET != newCluster.getType()) { + addRelationToCluster(feedEntityVertex, newCluster.getName(), + RelationshipLabel.FEED_CLUSTER_EDGE); + } + } + } + + public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, + List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) { + if (oldClusters.size() != newClusters.size()) { + return false; + } + + List<String> oldClusterNames = getFeedClusterNames(oldClusters); + List<String> newClusterNames = getFeedClusterNames(newClusters); + + return oldClusterNames.size() == newClusterNames.size() + && oldClusterNames.containsAll(newClusterNames) + && newClusterNames.containsAll(oldClusterNames); + } + + public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) { + List<String> clusterNames = new ArrayList<String>(clusters.size()); + for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) { + clusterNames.add(cluster.getName()); + } + + return clusterNames; + } + + public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, + List<org.apache.falcon.entity.v0.process.Cluster> newClusters, + Vertex processEntityVertex) { + if (areProcessClustersSame(oldClusters, newClusters)) { + return; + } + + // remove old clusters + for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) { + removeEdge(processEntityVertex, oldCuster.getName(), + RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()); + } + + // add new clusters + for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) { + addRelationToCluster(processEntityVertex, newCluster.getName(), + RelationshipLabel.PROCESS_CLUSTER_EDGE); + } + } + + public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, + List<org.apache.falcon.entity.v0.process.Cluster> newClusters) { + if (oldClusters.size() != newClusters.size()) { + return false; + } + + List<String> oldClusterNames = getProcessClusterNames(oldClusters); + List<String> newClusterNames = getProcessClusterNames(newClusters); + + return oldClusterNames.size() == newClusterNames.size() + && oldClusterNames.containsAll(newClusterNames) + && newClusterNames.containsAll(oldClusterNames); + } + + public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) { + List<String> clusterNames = new ArrayList<String>(clusters.size()); + for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) { + clusterNames.add(cluster.getName()); + } + + return clusterNames; + } + + public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) { + return areSame(oldWorkflow.getName(), newWorkflow.getName()) + && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion()) + && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value()); + } + + private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs, + Vertex processEntityVertex) { + if (areSame(oldProcessInputs, newProcessInputs)) { + return; + } + + removeInputFeeds(oldProcessInputs, processEntityVertex); + addInputFeeds(newProcessInputs, processEntityVertex); + } + + public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) { + if (oldProcessInputs == null && newProcessInputs == null) { + return true; + } + + if (oldProcessInputs == null || newProcessInputs == null + || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) { + return false; + } + + List<Input> oldInputs = oldProcessInputs.getInputs(); + List<Input> newInputs = newProcessInputs.getInputs(); + + return oldInputs.size() == newInputs.size() + && oldInputs.containsAll(newInputs) + && newInputs.containsAll(oldInputs); + } + + public void removeInputFeeds(Inputs inputs, Vertex processVertex) { + if (inputs == null) { + return; + } + + for (Input input : inputs.getInputs()) { + removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); + } + } + + public void removeOutputFeeds(Outputs outputs, Vertex processVertex) { + if (outputs == null) { + return; + } + + for (Output output : outputs.getOutputs()) { + removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); + } + } + + public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { + Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); + if (feedVertex == null) { + LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); + throw new IllegalStateException("Feed entity vertex must exist: " + feedName); + } + + if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) { + removeEdge(feedVertex, processVertex, edgeLabel.getName()); + } else { + removeEdge(processVertex, feedVertex, edgeLabel.getName()); + } + } + + private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs, + Vertex processEntityVertex) { + if (areSame(oldProcessOutputs, newProcessOutputs)) { + return; + } + + removeOutputFeeds(oldProcessOutputs, processEntityVertex); + addOutputFeeds(newProcessOutputs, processEntityVertex); + } + + public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) { + if (oldProcessOutputs == null && newProcessOutputs == null) { + return true; + } + + if (oldProcessOutputs == null || newProcessOutputs == null + || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) { + return false; + } + + List<Output> oldOutputs = oldProcessOutputs.getOutputs(); + List<Output> newOutputs = newProcessOutputs.getOutputs(); + + return oldOutputs.size() == newOutputs.size() + && oldOutputs.containsAll(newOutputs) + && newOutputs.containsAll(oldOutputs); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/e093668a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java index 30eeaa4..f70b446 100644 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java @@ -105,8 +105,8 @@ public class MetadataMappingServiceTest { private Cluster clusterEntity; private Cluster anotherCluster; - private List<Feed> inputFeeds = new ArrayList<Feed>(); - private List<Feed> outputFeeds = new ArrayList<Feed>(); + private List<Feed> inputFeeds = new ArrayList<>(); + private List<Feed> outputFeeds = new ArrayList<>(); private Process processEntity; @BeforeClass @@ -153,57 +153,82 @@ public class MetadataMappingServiceTest { @Test public void testOnAddClusterEntity() throws Exception { + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production"); verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); verifyClusterEntityEdges(); - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag + // +4 = cluster, colo, tag, user + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4); + // +3 = cluster to colo, user and tag + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3); } @Test (dependsOnMethods = "testOnAddClusterEntity") public void testOnAddFeedEntity() throws Exception { + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); inputFeeds.add(impressionsFeed); verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY); verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics"); - Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user - Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = feed, tag, group, + // user + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 4); // +4 = cluster, tag, group, user + // Get the before vertices and edges + beforeVerticesCount = getVerticesCount(service.getGraph()); + beforeEdgesCount = getEdgesCount(service.getGraph()); Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); inputFeeds.add(clicksFeed); verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex - Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // feed and financial vertex + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + 2Group + // + Tag + // Get the before vertices and edges + beforeVerticesCount = getVerticesCount(service.getGraph()); + beforeEdgesCount = getEdgesCount(service.getGraph()); Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); outputFeeds.add(join1Feed); verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups - Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user + + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // + 3 = 1 feed and 2 + // groups + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + // Group + 2Tags + // Get the before vertices and edges + beforeVerticesCount = getVerticesCount(service.getGraph()); + beforeEdgesCount = getEdgesCount(service.getGraph()); Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); outputFeeds.add(join2Feed); verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 13); // +1 feed + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); // +1 feed // +6 = user + 2tags + 2Groups + Cluster - Assert.assertEquals(getEdgesCount(service.getGraph()), 22); + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6); } @Test (dependsOnMethods = "testOnAddFeedEntity") public void testOnAddProcessEntity() throws Exception { + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, inputFeeds, outputFeeds); @@ -212,9 +237,9 @@ public class MetadataMappingServiceTest { verifyProcessEntityEdges(); // +4 = 1 process + 1 tag + 2 pipeline - Assert.assertEquals(getVerticesCount(service.getGraph()), 17); + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4); // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines - Assert.assertEquals(getEdgesCount(service.getGraph()), 31); + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 9); } @Test (dependsOnMethods = "testOnAddProcessEntity") @@ -226,6 +251,9 @@ public class MetadataMappingServiceTest { public void testMapLineage() throws Exception { setup(); + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null) , WorkflowExecutionContext.Type.POST_PROCESSING); @@ -236,15 +264,18 @@ public class MetadataMappingServiceTest { verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName()); // +6 = 1 process, 2 inputs = 3 instances,2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), 23); + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 6); //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 71); + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 40); } @Test public void testLineageForNoDateInFeedPath() throws Exception { setupForNoDateInFeedPath(); + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null), @@ -262,15 +293,30 @@ public class MetadataMappingServiceTest { Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected)); // +5 = 1 process, 2 inputs, 2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 5); //+34 = +26 for feed instances + 8 for process instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 65); + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 34); } @Test public void testLineageForReplication() throws Exception { setupForLineageReplication(); + // Get the before vertices and edges + // +7 [primary, bcp cluster] = cluster, colo, tag, user + // +3 [input feed] = feed, tag, group + // +4 [output feed] = 1 feed + 1 tag + 2 groups + // +4 [process] = 1 process + 1 tag + 2 pipeline + // +3 = 1 process, 1 input, 1 output + long beforeVerticesCount = getVerticesCount(service.getGraph()); + + // +4 [cluster] = cluster to colo and tag [primary and bcp], + // +4 [input feed] = cluster, tag, group, user + // +5 [output feed] = cluster + user + Group + 2Tags + // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines + // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance + long beforeEdgesCount = getEdgesCount(service.getGraph()); + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, "jail://global:00/falcon/raw-click/bcp/20140101", @@ -285,19 +331,11 @@ public class MetadataMappingServiceTest { "jail://global:00/falcon/raw-click/bcp/20140101", context, RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); - // +6 [primary, bcp cluster] = cluster, colo, tag, - // +4 [input feed] = feed, tag, group, user - // +4 [output feed] = 1 feed + 1 tag + 2 groups - // +4 [process] = 1 process + 1 tag + 2 pipeline - // +3 = 1 process, 1 input, 1 output - Assert.assertEquals(getVerticesCount(service.getGraph()), 21); - // +4 [cluster] = cluster to colo and tag [primary and bcp], - // +4 [input feed] = cluster, tag, group, user - // +5 [output feed] = cluster + user + Group + 2Tags - // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines - // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance + // No new vertex added after replication + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); + // +1 for replicated-to edge to target cluster for each output feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 40); + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1); } @Test @@ -333,6 +371,10 @@ public class MetadataMappingServiceTest { @Test public void testLineageForRetention() throws Exception { setupForLineageEviction(); + // Get the before vertices and edges + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED), @@ -356,11 +398,9 @@ public class MetadataMappingServiceTest { } // No new vertices added - Assert.assertEquals(getVerticesCount(service.getGraph()), 23); - // +1 = +2 for evicted-from edge from Feed Instance vertex to cluster. - // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no - // classified-as -> Secure edge. - Assert.assertEquals(getEdgesCount(service.getGraph()), 72); + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); + // +2 for evicted-from edge from Feed Instance vertex to cluster + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); } @Test @@ -390,14 +430,17 @@ public class MetadataMappingServiceTest { service.destroy(); service.init(); + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + // cannot modify cluster, adding a new cluster anotherCluster = addClusterEntity("another-cluster", "east-coast", "classification=another"); verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag - // +2 edges to above, no user but only to colo and new tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 33); + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = cluster, colo, tag + // +3 edges to user, colo and new tag + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3); } @Test(dependsOnMethods = "testOnChange") @@ -408,9 +451,15 @@ public class MetadataMappingServiceTest { addStorage(newFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101"); + long beforeVerticesCount = 0; + long beforeEdgesCount = 0; + try { configStore.initiateUpdate(newFeed); + beforeVerticesCount = getVerticesCount(service.getGraph()); + beforeEdgesCount = getEdgesCount(service.getGraph()); + // add cluster org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster(); @@ -423,8 +472,8 @@ public class MetadataMappingServiceTest { } verifyUpdatedEdges(newFeed); - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags - Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); //+2 = 2 new tags + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); // +2 = 1 new cluster, 1 new tag } @Test @@ -433,8 +482,8 @@ public class MetadataMappingServiceTest { "classification=production"); verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); verifyClusterEntityEdges(); - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag + Assert.assertEquals(getVerticesCount(service.getGraph()), 4); // +3 = cluster, colo, user, tag + Assert.assertEquals(getEdgesCount(service.getGraph()), 3); // +2 = cluster to colo, user and tag Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null); inputFeeds.add(feed); @@ -446,8 +495,8 @@ public class MetadataMappingServiceTest { WORKFLOW_VERSION, inputFeeds, outputFeeds); Assert.fail(); } catch (FalconException e) { - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); + Assert.assertEquals(getVerticesCount(service.getGraph()), 4); + Assert.assertEquals(getEdgesCount(service.getGraph()), 3); } } @@ -466,7 +515,7 @@ public class MetadataMappingServiceTest { Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse"); // new cluster - List<String> actual = new ArrayList<String>(); + List<String> actual = new ArrayList<>(); for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) { actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name")); } @@ -476,6 +525,9 @@ public class MetadataMappingServiceTest { @Test(dependsOnMethods = "testOnFeedEntityChange") public void testOnProcessEntityChange() throws Exception { + long beforeVerticesCount = getVerticesCount(service.getGraph()); + long beforeEdgesCount = getEdgesCount(service.getGraph()); + Process oldProcess = processEntity; Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster, null, null); @@ -490,8 +542,9 @@ public class MetadataMappingServiceTest { } verifyUpdatedEdges(newProcess); - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new - Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines + Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); // +0, no net new + Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount - 6); // -6 = -2 outputs, -1 tag, + // -1 cluster, -2 pipelines } @Test(dependsOnMethods = "testOnProcessEntityChange") @@ -686,7 +739,7 @@ public class MetadataMappingServiceTest { RelationshipType.PROCESS_ENTITY); // verify edge to cluster vertex - verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(), + verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName(), CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName()); // verify edge to user vertex verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(), @@ -696,7 +749,7 @@ public class MetadataMappingServiceTest { "Critical", RelationshipType.TAGS.getName()); // verify edges to inputs - List<String> actual = new ArrayList<String>(); + List<String> actual = new ArrayList<>(); for (Edge edge : processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName())) { Vertex outVertex = edge.getVertex(Direction.OUT); @@ -736,13 +789,16 @@ public class MetadataMappingServiceTest { private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label, String expectedName, String expectedType) { + boolean found = false; for (Edge edge : fromVertex.getEdges(direction, label)) { + found = true; Vertex outVertex = edge.getVertex(Direction.IN); Assert.assertEquals( outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName); Assert.assertEquals( outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType); } + Assert.assertFalse((!found), "Edge not found"); } private void verifyEntityGraph(RelationshipType feedType, String classification) { @@ -767,7 +823,7 @@ public class MetadataMappingServiceTest { .has(RelationshipProperty.NAME.getName(), FALCON_USER) .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName()); - List<String> feedNames = new ArrayList<String>(); + List<String> feedNames = new ArrayList<>(); for (Vertex userVertex : userQuery.vertices()) { for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { @@ -785,7 +841,7 @@ public class MetadataMappingServiceTest { .has(RelationshipProperty.NAME.getName(), "Secure") .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName()); - List<String> actual = new ArrayList<String>(); + List<String> actual = new ArrayList<>(); for (Vertex feedVertex : classQuery.vertices()) { for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) { if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { @@ -800,7 +856,7 @@ public class MetadataMappingServiceTest { private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification, List<String> expected) { - List<String> actual = new ArrayList<String>(); + List<String> actual = new ArrayList<>(); Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER); for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) {
