Repository: falcon Updated Branches: refs/heads/master 460dd209a -> 53bd6c38e
FALCON-1101 Remove the .orig files introduced by commit e093668a832eb2b9696ab572529c37a55671a907. Contributed by Sowmya Ramesh Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/53bd6c38 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/53bd6c38 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/53bd6c38 Branch: refs/heads/master Commit: 53bd6c38e76b6bddb45f6015bf70441c7e9099e1 Parents: 460dd20 Author: Sowmya Ramesh <[email protected]> Authored: Tue Jun 23 15:47:06 2015 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Tue Jun 23 15:47:06 2015 -0700 ---------------------------------------------------------------------- .../EntityRelationshipGraphBuilder.java.orig | 480 -------- .../MetadataMappingServiceTest.java.orig | 1107 ------------------ 2 files changed, 1587 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/53bd6c38/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig deleted file mode 100644 index 7ae7cd9..0000000 --- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java.orig +++ /dev/null @@ -1,480 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.Vertex; -import org.apache.falcon.entity.ProcessHelper; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.entity.v0.process.Workflow; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * Entity Metadata relationship mapping helper. - */ -public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class); - - - public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) { - super(graph, preserveHistory); - } - - public void addEntity(Entity entity) { - EntityType entityType = entity.getEntityType(); - switch (entityType) { - case CLUSTER: - addClusterEntity((Cluster) entity); - break; - case PROCESS: - addProcessEntity((Process) entity); - break; - case FEED: - addFeedEntity((Feed) entity); - break; - default: - throw new IllegalArgumentException("Invalid EntityType " + entityType); - } - } - - public void addClusterEntity(Cluster clusterEntity) { - LOG.info("Adding cluster entity: {}", clusterEntity.getName()); - Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY); - - addColoRelation(clusterEntity.getColo(), clusterVertex); - addDataClassification(clusterEntity.getTags(), clusterVertex); - } - - public void addFeedEntity(Feed feed) { - LOG.info("Adding feed entity: {}", feed.getName()); - Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY); - - addUserRelation(feedVertex); - addDataClassification(feed.getTags(), feedVertex); - addGroups(feed.getGroups(), feedVertex); - - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - if (ClusterType.TARGET != feedCluster.getType()) { - addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE); - } - } - } - - public void updateEntity(Entity oldEntity, Entity newEntity) { - EntityType entityType = oldEntity.getEntityType(); - switch (entityType) { - case CLUSTER: - // a cluster cannot be updated - break; - case PROCESS: - updateProcessEntity((Process) oldEntity, (Process) newEntity); - break; - case FEED: - updateFeedEntity((Feed) oldEntity, (Feed) newEntity); - break; - default: - throw new IllegalArgumentException("Invalid EntityType " + entityType); - } - } - - - - public void updateFeedEntity(Feed oldFeed, Feed newFeed) { - LOG.info("Updating feed entity: {}", newFeed.getName()); - Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY); - if (feedEntityVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName()); - throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist."); - } - - updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex); - updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex); - updateFeedClusters(oldFeed.getClusters().getClusters(), - newFeed.getClusters().getClusters(), feedEntityVertex); - } - - public void addProcessEntity(Process process) { - String processName = process.getName(); - LOG.info("Adding process entity: {}", processName); - Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY); - addWorkflowProperties(process.getWorkflow(), processVertex, processName); - - addUserRelation(processVertex); - addDataClassification(process.getTags(), processVertex); - addPipelines(process.getPipelines(), processVertex); - - for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) { - addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE); - } - - addInputFeeds(process.getInputs(), processVertex); - addOutputFeeds(process.getOutputs(), processVertex); - } - - public void updateProcessEntity(Process oldProcess, Process newProcess) { - LOG.info("Updating process entity: {}", newProcess.getName()); - Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY); - if (processEntityVertex == null) { - LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName()); - throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist"); - } - - updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(), - processEntityVertex, newProcess.getName()); - updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex); - updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex); - updateProcessClusters(oldProcess.getClusters().getClusters(), - newProcess.getClusters().getClusters(), processEntityVertex); - updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex); - updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex); - } - - public void addColoRelation(String colo, Vertex fromVertex) { - Vertex coloVertex = addVertex(colo, RelationshipType.COLO); - addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName()); - } - - public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) { - Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY); - if (clusterVertex == null) { // cluster must exist before adding other entities - LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName); - throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName); - } - - addEdge(fromVertex, clusterVertex, edgeLabel.getName()); - } - - public void addInputFeeds(Inputs inputs, Vertex processVertex) { - if (inputs == null) { - return; - } - - for (Input input : inputs.getInputs()) { - addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); - } - } - - public void addOutputFeeds(Outputs outputs, Vertex processVertex) { - if (outputs == null) { - return; - } - - for (Output output : outputs.getOutputs()) { - addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); - } - } - - public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { - Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); - if (feedVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); - } - - addProcessFeedEdge(processVertex, feedVertex, edgeLabel); - } - - public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) { - processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), - ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); - processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion()); - processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), - workflow.getEngine().value()); - } - - public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow, - Vertex processEntityVertex, String processName) { - if (areSame(oldWorkflow, newWorkflow)) { - return; - } - - LOG.info("Updating workflow properties for: {}", processEntityVertex); - addWorkflowProperties(newWorkflow, processEntityVertex, processName); - } - - public void updateDataClassification(String oldClassification, String newClassification, - Vertex entityVertex) { - if (areSame(oldClassification, newClassification)) { - return; - } - - removeDataClassification(oldClassification, entityVertex); - addDataClassification(newClassification, entityVertex); - } - - private void removeDataClassification(String classification, Vertex entityVertex) { - if (classification == null || classification.length() == 0) { - return; - } - - String[] oldTags = classification.split(","); - for (String oldTag : oldTags) { - int index = oldTag.indexOf("="); - String tagKey = oldTag.substring(0, index); - String tagValue = oldTag.substring(index + 1, oldTag.length()); - - removeEdge(entityVertex, tagValue, tagKey); - } - } - - public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) { - if (areSame(oldGroups, newGroups)) { - return; - } - - removeGroups(oldGroups, entityVertex); - addGroups(newGroups, entityVertex); - } - - public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) { - if (areSame(oldPipelines, newPipelines)) { - return; - } - - removePipelines(oldPipelines, entityVertex); - addPipelines(newPipelines, entityVertex); - } - - private void removeGroups(String groups, Vertex entityVertex) { - removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS); - } - - private void removePipelines(String pipelines, Vertex entityVertex) { - removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES); - } - - private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex, - RelationshipLabel edgeLabel) { - if (StringUtils.isEmpty(groupsOrPipelines)) { - return; - } - - String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(","); - for (String groupOrPipelineTag : oldGroupOrPipelinesTags) { - removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName()); - } - } - - public static boolean areSame(String oldValue, String newValue) { - return oldValue == null && newValue == null - || oldValue != null && newValue != null && oldValue.equals(newValue); - } - - public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.feed.Cluster> newClusters, - Vertex feedEntityVertex) { - if (areFeedClustersSame(oldClusters, newClusters)) { - return; - } - - // remove edges to old clusters - for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) { - if (ClusterType.TARGET != oldCuster.getType()) { - removeEdge(feedEntityVertex, oldCuster.getName(), - RelationshipLabel.FEED_CLUSTER_EDGE.getName()); - } - } - - // add edges to new clusters - for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) { - if (ClusterType.TARGET != newCluster.getType()) { - addRelationToCluster(feedEntityVertex, newCluster.getName(), - RelationshipLabel.FEED_CLUSTER_EDGE); - } - } - } - - public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) { - if (oldClusters.size() != newClusters.size()) { - return false; - } - - List<String> oldClusterNames = getFeedClusterNames(oldClusters); - List<String> newClusterNames = getFeedClusterNames(newClusters); - - return oldClusterNames.size() == newClusterNames.size() - && oldClusterNames.containsAll(newClusterNames) - && newClusterNames.containsAll(oldClusterNames); - } - - public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) { - List<String> clusterNames = new ArrayList<String>(clusters.size()); - for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) { - clusterNames.add(cluster.getName()); - } - - return clusterNames; - } - - public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.process.Cluster> newClusters, - Vertex processEntityVertex) { - if (areProcessClustersSame(oldClusters, newClusters)) { - return; - } - - // remove old clusters - for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) { - removeEdge(processEntityVertex, oldCuster.getName(), - RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()); - } - - // add new clusters - for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) { - addRelationToCluster(processEntityVertex, newCluster.getName(), - RelationshipLabel.PROCESS_CLUSTER_EDGE); - } - } - - public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters, - List<org.apache.falcon.entity.v0.process.Cluster> newClusters) { - if (oldClusters.size() != newClusters.size()) { - return false; - } - - List<String> oldClusterNames = getProcessClusterNames(oldClusters); - List<String> newClusterNames = getProcessClusterNames(newClusters); - - return oldClusterNames.size() == newClusterNames.size() - && oldClusterNames.containsAll(newClusterNames) - && newClusterNames.containsAll(oldClusterNames); - } - - public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) { - List<String> clusterNames = new ArrayList<String>(clusters.size()); - for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) { - clusterNames.add(cluster.getName()); - } - - return clusterNames; - } - - public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) { - return areSame(oldWorkflow.getName(), newWorkflow.getName()) - && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion()) - && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value()); - } - - private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs, - Vertex processEntityVertex) { - if (areSame(oldProcessInputs, newProcessInputs)) { - return; - } - - removeInputFeeds(oldProcessInputs, processEntityVertex); - addInputFeeds(newProcessInputs, processEntityVertex); - } - - public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) { - if (oldProcessInputs == null && newProcessInputs == null) { - return true; - } - - if (oldProcessInputs == null || newProcessInputs == null - || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) { - return false; - } - - List<Input> oldInputs = oldProcessInputs.getInputs(); - List<Input> newInputs = newProcessInputs.getInputs(); - - return oldInputs.size() == newInputs.size() - && oldInputs.containsAll(newInputs) - && newInputs.containsAll(oldInputs); - } - - public void removeInputFeeds(Inputs inputs, Vertex processVertex) { - if (inputs == null) { - return; - } - - for (Input input : inputs.getInputs()) { - removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE); - } - } - - public void removeOutputFeeds(Outputs outputs, Vertex processVertex) { - if (outputs == null) { - return; - } - - for (Output output : outputs.getOutputs()) { - removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE); - } - } - - public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) { - Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY); - if (feedVertex == null) { - LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName); - throw new IllegalStateException("Feed entity vertex must exist: " + feedName); - } - - if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) { - removeEdge(feedVertex, processVertex, edgeLabel.getName()); - } else { - removeEdge(processVertex, feedVertex, edgeLabel.getName()); - } - } - - private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs, - Vertex processEntityVertex) { - if (areSame(oldProcessOutputs, newProcessOutputs)) { - return; - } - - removeOutputFeeds(oldProcessOutputs, processEntityVertex); - addOutputFeeds(newProcessOutputs, processEntityVertex); - } - - public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) { - if (oldProcessOutputs == null && newProcessOutputs == null) { - return true; - } - - if (oldProcessOutputs == null || newProcessOutputs == null - || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) { - return false; - } - - List<Output> oldOutputs = oldProcessOutputs.getOutputs(); - List<Output> newOutputs = newProcessOutputs.getOutputs(); - - return oldOutputs.size() == newOutputs.size() - && oldOutputs.containsAll(newOutputs) - && newOutputs.containsAll(oldOutputs); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/53bd6c38/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig deleted file mode 100644 index 30eeaa4..0000000 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java.orig +++ /dev/null @@ -1,1107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.GraphQuery; -import com.tinkerpop.blueprints.Vertex; -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EntityBuilderTestUtil; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.process.EngineType; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.retention.EvictedInstanceSerDe; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.service.Services; -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations; -import org.apache.falcon.workflow.WorkflowJobEndNotificationService; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -/** - * Test for Metadata relationship mapping service. - */ -public class MetadataMappingServiceTest { - - public static final String FALCON_USER = "falcon-user"; - private static final String LOGS_DIR = "/falcon/staging/feed/logs"; - private static final String NOMINAL_TIME = "2014-01-01-01-00"; - - public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; - public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster"; - public static final String PROCESS_ENTITY_NAME = "sample-process"; - public static final String COLO_NAME = "west-coast"; - public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow"; - public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow"; - private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow"; - public static final String WORKFLOW_VERSION = "1.0.9"; - - public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed"; - public static final String INPUT_INSTANCE_PATHS = - "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02" - + "#jail://global:00/falcon/clicks-feed/2014-01-01"; - public static final String INPUT_INSTANCE_PATHS_NO_DATE = - "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed" - + "#jail://global:00/falcon/clicks-feed"; - - public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2"; - public static final String OUTPUT_INSTANCE_PATHS = - "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101"; - private static final String REPLICATED_FEED = "raw-click"; - private static final String EVICTED_FEED = "imp-click-join1"; - private static final String EVICTED_INSTANCE_PATHS = - "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102"; - public static final String OUTPUT_INSTANCE_PATHS_NO_DATE = - "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2"; - - public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory"; - - private ConfigurationStore configStore; - private MetadataMappingService service; - - private Cluster clusterEntity; - private Cluster anotherCluster; - private List<Feed> inputFeeds = new ArrayList<Feed>(); - private List<Feed> outputFeeds = new ArrayList<Feed>(); - private Process processEntity; - - @BeforeClass - public void setUp() throws Exception { - CurrentUser.authenticate(FALCON_USER); - - configStore = ConfigurationStore.get(); - - Services.get().register(new WorkflowJobEndNotificationService()); - StartupProperties.get().setProperty("falcon.graph.storage.directory", - "target/graphdb-" + System.currentTimeMillis()); - StartupProperties.get().setProperty("falcon.graph.preserve.history", "true"); - service = new MetadataMappingService(); - service.init(); - - Set<String> vertexPropertyKeys = service.getVertexIndexedKeys(); - System.out.println("Got vertex property keys: " + vertexPropertyKeys); - - Set<String> edgePropertyKeys = service.getEdgeIndexedKeys(); - System.out.println("Got edge property keys: " + edgePropertyKeys); - } - - @AfterClass - public void tearDown() throws Exception { - GraphUtils.dump(service.getGraph(), System.out); - - cleanUp(); - StartupProperties.get().setProperty("falcon.graph.preserve.history", "false"); - } - - @AfterMethod - public void printGraph() throws Exception { - GraphUtils.dump(service.getGraph()); - } - - private GraphQuery getQuery() { - return service.getGraph().query(); - } - - @Test - public void testGetName() throws Exception { - Assert.assertEquals(service.getName(), MetadataMappingService.SERVICE_NAME); - } - - @Test - public void testOnAddClusterEntity() throws Exception { - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - - verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); - verifyClusterEntityEdges(); - - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag - } - - @Test (dependsOnMethods = "testOnAddClusterEntity") - public void testOnAddFeedEntity() throws Exception { - Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - inputFeeds.add(impressionsFeed); - verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY); - verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics"); - Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user - Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user - - Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - inputFeeds.add(clicksFeed); - verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex - Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag - - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join1Feed); - verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups - Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user + - // Group + 2Tags - - Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join2Feed); - verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY); - - Assert.assertEquals(getVerticesCount(service.getGraph()), 13); // +1 feed - // +6 = user + 2tags + 2Groups + Cluster - Assert.assertEquals(getEdgesCount(service.getGraph()), 22); - } - - @Test (dependsOnMethods = "testOnAddFeedEntity") - public void testOnAddProcessEntity() throws Exception { - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inputFeeds, outputFeeds); - - verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY); - verifyProcessEntityEdges(); - - // +4 = 1 process + 1 tag + 2 pipeline - Assert.assertEquals(getVerticesCount(service.getGraph()), 17); - // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines - Assert.assertEquals(getEdgesCount(service.getGraph()), 31); - } - - @Test (dependsOnMethods = "testOnAddProcessEntity") - public void testOnAdd() throws Exception { - verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure"); - } - - @Test - public void testMapLineage() throws Exception { - setup(); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null) - , WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName()); - - // +6 = 1 process, 2 inputs = 3 instances,2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), 23); - //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 71); - } - - @Test - public void testLineageForNoDateInFeedPath() throws Exception { - setupForNoDateInFeedPath(); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, - OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - // Verify if instance name has nominal time - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser( - RelationshipType.FEED_INSTANCE.getName()); - List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z", - "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z"); - Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected)); - - // +5 = 1 process, 2 inputs, 2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); - //+34 = +26 for feed instances + 8 for process instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 65); - } - - @Test - public void testLineageForReplication() throws Exception { - setupForLineageReplication(); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", context, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); - - // +6 [primary, bcp cluster] = cluster, colo, tag, - // +4 [input feed] = feed, tag, group, user - // +4 [output feed] = 1 feed + 1 tag + 2 groups - // +4 [process] = 1 process + 1 tag + 2 pipeline - // +3 = 1 process, 1 input, 1 output - Assert.assertEquals(getVerticesCount(service.getGraph()), 21); - // +4 [cluster] = cluster to colo and tag [primary and bcp], - // +4 [input feed] = cluster, tag, group, user - // +5 [output feed] = cluster + user + Group + 2Tags - // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines - // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance - // +1 for replicated-to edge to target cluster for each output feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), 40); - } - - @Test - public void testLineageForReplicationForNonGeneratedInstances() throws Exception { - cleanUp(); - service.init(); - addClusterAndFeedForReplication(inputFeeds); - // Get the vertices before running replication WF - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics"); - verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", context, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); - - // +1 for the new instance vertex added - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); - // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6); - } - - @Test - public void testLineageForRetention() throws Exception { - setupForLineageEviction(); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, - EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"); - List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", - "clicks-feed/2014-01-01T00:00Z"); - List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"); - verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds); - String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); - for (String feedInstanceDataPath : paths) { - verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context, - RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE); - } - - // No new vertices added - Assert.assertEquals(getVerticesCount(service.getGraph()), 23); - // +1 = +2 for evicted-from edge from Feed Instance vertex to cluster. - // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no - // classified-as -> Secure edge. - Assert.assertEquals(getEdgesCount(service.getGraph()), 72); - } - - @Test - public void testLineageForRetentionWithNoFeedsEvicted() throws Exception { - cleanUp(); - service.init(); - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, - EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - // No new vertices added - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount); - // No new edges added - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount); - } - - @Test (dependsOnMethods = "testOnAdd") - public void testOnChange() throws Exception { - // shutdown the graph and resurrect for testing - service.destroy(); - service.init(); - - // cannot modify cluster, adding a new cluster - anotherCluster = addClusterEntity("another-cluster", "east-coast", - "classification=another"); - verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY); - - Assert.assertEquals(getVerticesCount(service.getGraph()), 20); // +3 = cluster, colo, tag - // +2 edges to above, no user but only to colo and new tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 33); - } - - @Test(dependsOnMethods = "testOnChange") - public void testOnFeedEntityChange() throws Exception { - Feed oldFeed = inputFeeds.get(0); - Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity, - "classified-as=Secured,source=data-warehouse", "reporting"); - addStorage(newFeed, Storage.TYPE.FILESYSTEM, - "jail://global:00/falcon/impression-feed/20140101"); - - try { - configStore.initiateUpdate(newFeed); - - // add cluster - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(anotherCluster.getName()); - newFeed.getClusters().getClusters().add(feedCluster); - - configStore.update(EntityType.FEED, newFeed); - } finally { - configStore.cleanupUpdateInit(); - } - - verifyUpdatedEdges(newFeed); - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); //+2 = 2 new tags - Assert.assertEquals(getEdgesCount(service.getGraph()), 35); // +2 = 1 new cluster, 1 new tag - } - - @Test - public void testLineageForTransactionFailure() throws Exception { - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); - verifyClusterEntityEdges(); - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag - - Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null); - inputFeeds.add(feed); - outputFeeds.add(feed); - - try { - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inputFeeds, outputFeeds); - Assert.fail(); - } catch (FalconException e) { - Assert.assertEquals(getVerticesCount(service.getGraph()), 3); - Assert.assertEquals(getEdgesCount(service.getGraph()), 2); - } - - } - - private void verifyUpdatedEdges(Feed newFeed) { - Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY); - - // groups - Edge edge = feedVertex.getEdges(Direction.OUT, RelationshipLabel.GROUPS.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "reporting"); - - // tags - edge = feedVertex.getEdges(Direction.OUT, "classified-as").iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "Secured"); - edge = feedVertex.getEdges(Direction.OUT, "source").iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse"); - - // new cluster - List<String> actual = new ArrayList<String>(); - for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) { - actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name")); - } - Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")), - "Actual does not contain expected: " + actual); - } - - @Test(dependsOnMethods = "testOnFeedEntityChange") - public void testOnProcessEntityChange() throws Exception { - Process oldProcess = processEntity; - Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster, - null, null); - EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0"); - EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0)); - - try { - configStore.initiateUpdate(newProcess); - configStore.update(EntityType.PROCESS, newProcess); - } finally { - configStore.cleanupUpdateInit(); - } - - verifyUpdatedEdges(newProcess); - Assert.assertEquals(getVerticesCount(service.getGraph()), 22); // +0, no net new - Assert.assertEquals(getEdgesCount(service.getGraph()), 29); // -6 = -2 outputs, -1 tag, -1 cluster, -2 pipelines - } - - @Test(dependsOnMethods = "testOnProcessEntityChange") - public void testAreSame() throws Exception { - - Inputs inputs1 = new Inputs(); - Inputs inputs2 = new Inputs(); - Outputs outputs1 = new Outputs(); - Outputs outputs2 = new Outputs(); - // return true when both are null - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - - Input i1 = new Input(); - i1.setName("input1"); - Input i2 = new Input(); - i2.setName("input2"); - Output o1 = new Output(); - o1.setName("output1"); - Output o2 = new Output(); - o2.setName("output2"); - - inputs1.getInputs().add(i1); - Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - outputs1.getOutputs().add(o1); - Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - - inputs2.getInputs().add(i1); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - outputs2.getOutputs().add(o1); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - } - - private void verifyUpdatedEdges(Process newProcess) { - Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY); - - // cluster - Edge edge = processVertex.getEdges(Direction.OUT, - RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName()); - - // inputs - edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.OUT).getProperty("name"), - newProcess.getInputs().getInputs().get(0).getFeed()); - - // outputs - for (Edge e : processVertex.getEdges(Direction.OUT, RelationshipLabel.PROCESS_FEED_EDGE.getName())) { - Assert.fail("there should not be any edges to output feeds" + e); - } - } - - public static void debug(final Graph graph) { - System.out.println("*****Vertices of " + graph); - for (Vertex vertex : graph.getVertices()) { - System.out.println(GraphUtils.vertexString(vertex)); - } - - System.out.println("*****Edges of " + graph); - for (Edge edge : graph.getEdges()) { - System.out.println(GraphUtils.edgeString(edge)); - } - } - - private Cluster addClusterEntity(String name, String colo, String tags) throws Exception { - Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags); - configStore.publish(EntityType.CLUSTER, cluster); - return cluster; - } - - private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups, - Storage.TYPE storageType, String uriTemplate) throws Exception { - return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate); - } - - private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups, - Storage.TYPE storageType, String uriTemplate) throws Exception { - Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters, - tags, groups); - addStorage(feed, storageType, uriTemplate); - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) { - feedCluster.setType(ClusterType.TARGET); - } - } - configStore.publish(EntityType.FEED, feed); - return feed; - } - - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public Process addProcessEntity(String processName, Cluster cluster, - String tags, String pipelineTags, String workflowName, - String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception { - Process process = EntityBuilderTestUtil.buildProcess(processName, cluster, - tags, pipelineTags); - EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version); - - for (Feed inputFeed : inFeeds) { - EntityBuilderTestUtil.addInput(process, inputFeed); - } - - for (Feed outputFeed : outFeeds) { - EntityBuilderTestUtil.addOutput(process, outputFeed); - } - - configStore.publish(EntityType.PROCESS, process); - return process; - } - //RESUME CHECKSTYLE CHECK ParameterNumberCheck - - private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) { - if (storageType == Storage.TYPE.FILESYSTEM) { - Locations locations = new Locations(); - feed.setLocations(locations); - - Location location = new Location(); - location.setType(LocationType.DATA); - location.setPath(uriTemplate); - feed.getLocations().getLocations().add(location); - } else { - CatalogTable table = new CatalogTable(); - table.setUri(uriTemplate); - feed.setTable(table); - } - } - - private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed, - Storage.TYPE storageType, String uriTemplate) { - if (storageType == Storage.TYPE.FILESYSTEM) { - Locations locations = new Locations(); - feed.setLocations(locations); - - Location location = new Location(); - location.setType(LocationType.DATA); - location.setPath(uriTemplate); - cluster.setLocations(new Locations()); - cluster.getLocations().getLocations().add(location); - } else { - CatalogTable table = new CatalogTable(); - table.setUri(uriTemplate); - cluster.setTable(table); - } - } - - private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) { - Vertex entityVertex = getEntityVertex(entityName, entityType); - Assert.assertNotNull(entityVertex); - verifyEntityProperties(entityVertex, entityName, entityType); - } - - private void verifyEntityProperties(Vertex entityVertex, String entityName, RelationshipType entityType) { - Assert.assertEquals(entityName, entityVertex.getProperty(RelationshipProperty.NAME.getName())); - Assert.assertEquals(entityType.getName(), entityVertex.getProperty(RelationshipProperty.TYPE.getName())); - Assert.assertNotNull(entityVertex.getProperty(RelationshipProperty.TIMESTAMP.getName())); - } - - private void verifyClusterEntityEdges() { - Vertex clusterVertex = getEntityVertex(CLUSTER_ENTITY_NAME, - RelationshipType.CLUSTER_ENTITY); - - // verify edge to user vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - // verify edge to colo vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(), - COLO_NAME, RelationshipType.COLO.getName()); - // verify edge to tags vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, "classification", - "production", RelationshipType.TAGS.getName()); - } - - private void verifyFeedEntityEdges(String feedName, String tag, String group) { - Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY); - - // verify edge to cluster vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(), - CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName()); - // verify edge to user vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - - // verify edge to tags vertex - verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as", - tag, RelationshipType.TAGS.getName()); - // verify edge to group vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(), - group, RelationshipType.GROUPS.getName()); - } - - private void verifyProcessEntityEdges() { - Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME, - RelationshipType.PROCESS_ENTITY); - - // verify edge to cluster vertex - verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(), - CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName()); - // verify edge to user vertex - verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - // verify edge to tags vertex - verifyVertexForEdge(processVertex, Direction.OUT, "classified-as", - "Critical", RelationshipType.TAGS.getName()); - - // verify edges to inputs - List<String> actual = new ArrayList<String>(); - for (Edge edge : processVertex.getEdges(Direction.IN, - RelationshipLabel.FEED_PROCESS_EDGE.getName())) { - Vertex outVertex = edge.getVertex(Direction.OUT); - Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), - outVertex.getProperty(RelationshipProperty.TYPE.getName())); - actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName())); - } - - Assert.assertTrue(actual.containsAll(Arrays.asList("impression-feed", "clicks-feed")), - "Actual does not contain expected: " + actual); - - actual.clear(); - // verify edges to outputs - for (Edge edge : processVertex.getEdges(Direction.OUT, - RelationshipLabel.PROCESS_FEED_EDGE.getName())) { - Vertex outVertex = edge.getVertex(Direction.IN); - Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), - outVertex.getProperty(RelationshipProperty.TYPE.getName())); - actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName())); - } - Assert.assertTrue(actual.containsAll(Arrays.asList("imp-click-join1", "imp-click-join2")), - "Actual does not contain expected: " + actual); - } - - private Vertex getEntityVertex(String entityName, RelationshipType entityType) { - GraphQuery entityQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), entityName) - .has(RelationshipProperty.TYPE.getName(), entityType.getName()); - Iterator<Vertex> iterator = entityQuery.vertices().iterator(); - Assert.assertTrue(iterator.hasNext()); - - Vertex entityVertex = iterator.next(); - Assert.assertNotNull(entityVertex); - - return entityVertex; - } - - private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label, - String expectedName, String expectedType) { - for (Edge edge : fromVertex.getEdges(direction, label)) { - Vertex outVertex = edge.getVertex(Direction.IN); - Assert.assertEquals( - outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName); - Assert.assertEquals( - outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType); - } - } - - private void verifyEntityGraph(RelationshipType feedType, String classification) { - // feeds owned by a user - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName()); - Assert.assertEquals(feedNamesOwnedByUser, - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", - "imp-click-join2") - ); - - // feeds classified as secure - verifyFeedsClassifiedAsSecure(feedType.getName(), - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2")); - - // feeds owned by a user and classified as secure - verifyFeedsOwnedByUserAndClassification(feedType.getName(), classification, - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2")); - } - - private List<String> getFeedsOwnedByAUser(String feedType) { - GraphQuery userQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), FALCON_USER) - .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName()); - - List<String> feedNames = new ArrayList<String>(); - for (Vertex userVertex : userQuery.vertices()) { - for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - System.out.println(FALCON_USER + " owns -> " + GraphUtils.vertexString(feed)); - feedNames.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - } - } - } - - return feedNames; - } - - private void verifyFeedsClassifiedAsSecure(String feedType, List<String> expected) { - GraphQuery classQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), "Secure") - .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName()); - - List<String> actual = new ArrayList<String>(); - for (Vertex feedVertex : classQuery.vertices()) { - for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - System.out.println(" Secure classification -> " + GraphUtils.vertexString(feed)); - actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - } - } - } - - Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual); - } - - private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification, - List<String> expected) { - List<String> actual = new ArrayList<String>(); - Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER); - for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - for (Vertex classVertex : feed.getVertices(Direction.OUT, "classified-as")) { - if (classVertex.getProperty(RelationshipProperty.NAME.getName()) - .equals(classification)) { - actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - System.out.println(classification + " feed owned by falcon-user -> " - + GraphUtils.vertexString(feed)); - } - } - } - } - Assert.assertTrue(actual.containsAll(expected), - "Actual does not contain expected: " + actual); - } - - public long getVerticesCount(final Graph graph) { - long count = 0; - for (Vertex ignored : graph.getVertices()) { - count++; - } - - return count; - } - - public long getEdgesCount(final Graph graph) { - long count = 0; - for (Edge ignored : graph.getEdges()) { - count++; - } - - return count; - } - - private void verifyLineageGraph(String feedType) { - List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", - "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds); - } - - private void verifyLineageGraph(String feedType, List<String> expectedFeeds, - List<String> secureFeeds, List<String> ownedAndSecureFeeds) { - // feeds owned by a user - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType); - Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds)); - - Graph graph = service.getGraph(); - - Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/2014-01-01T00:00Z").iterator(); - Assert.assertTrue(vertices.hasNext()); - Vertex feedInstanceVertex = vertices.next(); - Assert.assertEquals(feedInstanceVertex.getProperty(RelationshipProperty.TYPE.getName()), - RelationshipType.FEED_INSTANCE.getName()); - - Object vertexId = feedInstanceVertex.getId(); - Vertex vertexById = graph.getVertex(vertexId); - Assert.assertEquals(vertexById, feedInstanceVertex); - - // feeds classified as secure - verifyFeedsClassifiedAsSecure(feedType, secureFeeds); - - // feeds owned by a user and classified as secure - verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds); - } - - private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath, - WorkflowExecutionContext context, - RelationshipLabel edgeLabel) throws Exception { - String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName - , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName()) - .iterator().next(); - Assert.assertNotNull(edge); - Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName()) - , context.getTimeStampAsISO8601()); - - Vertex clusterVertex = edge.getVertex(Direction.IN); - Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName()); - } - - private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames, - String feedInstancePaths, String falconInputPaths, - String falconInputFeeds) { - String cluster; - if (EntityOperations.REPLICATE == operation) { - cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME; - } else { - cluster = CLUSTER_ENTITY_NAME; - } - - return new String[]{ - "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster, - "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"), - "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME, - "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME, - "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(), - - "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), - (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES), - "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), - (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS), - - "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), - (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES), - "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), - (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS), - - "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00", - "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER, - "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1", - "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED", - "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME, - - "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie", - "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id", - "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName, - "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION, - "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(), - - "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER, - "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true", - "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER, - "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true", - "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000", - - "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR, - }; - } - - private void setup() throws Exception { - cleanUp(); - service.init(); - - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - - addFeedsAndProcess(clusterEntity); - } - - private void addFeedsAndProcess(Cluster cluster) throws Exception { - // Add input and output feeds - Feed impressionsFeed = addFeedEntity("impression-feed", cluster, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - inFeeds.add(impressionsFeed); - Feed clicksFeed = addFeedEntity("clicks-feed", cluster, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - inFeeds.add(clicksFeed); - Feed join1Feed = addFeedEntity("imp-click-join1", cluster, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join1Feed); - Feed join2Feed = addFeedEntity("imp-click-join2", cluster, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join2Feed); - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - } - - private void setupForLineageReplication() throws Exception { - cleanUp(); - service.init(); - - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - - addClusterAndFeedForReplication(inFeeds); - - // Add output feed - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join1Feed); - - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - - // GENERATE WF should have run before this to create all instance related vertices - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1", - "jail://global:00/falcon/imp-click-join1/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", - REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - } - - private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception { - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - // Add backup cluster - Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp"); - - Cluster[] clusters = {clusterEntity, bcpCluster}; - - // Add feed - Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}"); - // Add uri template for each cluster - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) { - if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) { - addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}"); - } else { - addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}"); - } - } - - // update config store - try { - configStore.initiateUpdate(rawFeed); - configStore.update(EntityType.FEED, rawFeed); - } finally { - configStore.cleanupUpdateInit(); - } - inFeeds.add(rawFeed); - } - - private void setupForLineageEviction() throws Exception { - setup(); - - // GENERATE WF should have run before this to create all instance related vertices - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, - "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - } - - private void setupForNoDateInFeedPath() throws Exception { - cleanUp(); - service.init(); - - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - // Add input and output feeds - Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed"); - inFeeds.add(impressionsFeed); - Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed"); - inFeeds.add(clicksFeed); - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1"); - outFeeds.add(join1Feed); - Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2"); - outFeeds.add(join2Feed); - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - - } - - private void cleanUp() throws Exception { - cleanupGraphStore(service.getGraph()); - cleanupConfigurationStore(configStore); - service.destroy(); - } - - private void cleanupGraphStore(Graph graph) { - for (Edge edge : graph.getEdges()) { - graph.removeEdge(edge); - } - - for (Vertex vertex : graph.getVertices()) { - graph.removeVertex(vertex); - } - - graph.shutdown(); - } - - private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception { - for (EntityType type : EntityType.values()) { - Collection<String> entities = store.getEntities(type); - for (String entity : entities) { - store.remove(type, entity); - } - } - } -}
