http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java deleted file mode 100644 index 29f933d..0000000 --- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java +++ /dev/null @@ -1,1228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.metadata; - -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.GraphQuery; -import com.tinkerpop.blueprints.Vertex; -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EntityBuilderTestUtil; -import org.apache.falcon.entity.Storage; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.process.EngineType; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.retention.EvictedInstanceSerDe; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.service.Services; -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.WorkflowExecutionArgs; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import org.apache.falcon.workflow.WorkflowJobEndNotificationService; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations; - -/** - * Test for Metadata relationship mapping service. - */ -public class MetadataMappingServiceTest { - - public static final String FALCON_USER = "falcon-user"; - private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs"; - private static final String NOMINAL_TIME = "2014-01-01-01-00"; - - public static final String CLUSTER_ENTITY_NAME = "primary-cluster"; - public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster"; - public static final String PROCESS_ENTITY_NAME = "sample-process"; - public static final String COLO_NAME = "west-coast"; - public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow"; - public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow"; - private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow"; - public static final String WORKFLOW_VERSION = "1.0.9"; - - public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed"; - public static final String INPUT_INSTANCE_PATHS = - "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02" - + "#jail://global:00/falcon/clicks-feed/2014-01-01"; - public static final String INPUT_INSTANCE_PATHS_NO_DATE = - "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed" - + "#jail://global:00/falcon/clicks-feed"; - - public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2"; - public static final String OUTPUT_INSTANCE_PATHS = - "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101"; - private static final String REPLICATED_FEED = "raw-click"; - private static final String EVICTED_FEED = "imp-click-join1"; - private static final String EVICTED_INSTANCE_PATHS = - "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102"; - public static final String OUTPUT_INSTANCE_PATHS_NO_DATE = - "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2"; - public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000"; - - public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory"; - - private ConfigurationStore configStore; - private MetadataMappingService service; - - private Cluster clusterEntity; - private Cluster anotherCluster; - private List<Feed> inputFeeds = new ArrayList<>(); - private List<Feed> outputFeeds = new ArrayList<>(); - private Process processEntity; - - @BeforeClass - public void setUp() throws Exception { - CurrentUser.authenticate(FALCON_USER); - - configStore = ConfigurationStore.get(); - - Services.get().register(new WorkflowJobEndNotificationService()); - StartupProperties.get().setProperty("falcon.graph.storage.directory", - "target/graphdb-" + System.currentTimeMillis()); - StartupProperties.get().setProperty("falcon.graph.preserve.history", "true"); - service = new MetadataMappingService(); - service.init(); - - Set<String> vertexPropertyKeys = service.getVertexIndexedKeys(); - System.out.println("Got vertex property keys: " + vertexPropertyKeys); - - Set<String> edgePropertyKeys = service.getEdgeIndexedKeys(); - System.out.println("Got edge property keys: " + edgePropertyKeys); - } - - @AfterClass - public void tearDown() throws Exception { - GraphUtils.dump(service.getGraph(), System.out); - - cleanUp(); - StartupProperties.get().setProperty("falcon.graph.preserve.history", "false"); - } - - @AfterMethod - public void printGraph() throws Exception { - GraphUtils.dump(service.getGraph()); - } - - private GraphQuery getQuery() { - return service.getGraph().query(); - } - - @Test - public void testGetName() throws Exception { - Assert.assertEquals(service.getName(), MetadataMappingService.SERVICE_NAME); - } - - @Test - public void testOnAddClusterEntity() throws Exception { - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - - verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); - verifyClusterEntityEdges(); - - // +4 = cluster, colo, tag, user - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4); - // +3 = cluster to colo, user and tag - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3); - } - - @Test (dependsOnMethods = "testOnAddClusterEntity") - public void testOnAddFeedEntity() throws Exception { - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - inputFeeds.add(impressionsFeed); - verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY); - verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics"); - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = feed, tag, group, - // user - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 4); // +4 = cluster, tag, group, user - - // Get the before vertices and edges - beforeVerticesCount = getVerticesCount(service.getGraph()); - beforeEdgesCount = getEdgesCount(service.getGraph()); - Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - inputFeeds.add(clicksFeed); - verifyEntityWasAddedToGraph(clicksFeed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // feed and financial vertex - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + 2Group - // + Tag - - // Get the before vertices and edges - beforeVerticesCount = getVerticesCount(service.getGraph()); - beforeEdgesCount = getEdgesCount(service.getGraph()); - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join1Feed); - verifyEntityWasAddedToGraph(join1Feed.getName(), RelationshipType.FEED_ENTITY); - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // + 3 = 1 feed and 2 - // groups - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 5); // +5 = cluster + user + - // Group + 2Tags - - // Get the before vertices and edges - beforeVerticesCount = getVerticesCount(service.getGraph()); - beforeEdgesCount = getEdgesCount(service.getGraph()); - Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - outputFeeds.add(join2Feed); - verifyEntityWasAddedToGraph(join2Feed.getName(), RelationshipType.FEED_ENTITY); - - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); // +1 feed - // +6 = user + 2tags + 2Groups + Cluster - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6); - } - - @Test (dependsOnMethods = "testOnAddFeedEntity") - public void testOnAddProcessEntity() throws Exception { - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inputFeeds, outputFeeds); - - verifyEntityWasAddedToGraph(processEntity.getName(), RelationshipType.PROCESS_ENTITY); - verifyProcessEntityEdges(); - - // +4 = 1 process + 1 tag + 2 pipeline - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 4); - // +9 = user,tag,cluster, 2 inputs,2 outputs, 2 pipelines - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 9); - } - - @Test (dependsOnMethods = "testOnAddProcessEntity") - public void testOnAdd() throws Exception { - verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure"); - } - - @Test - public void testMapLineage() throws Exception { - setup(); - - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null) - , WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName()); - - // +6 = 1 process, 2 inputs = 3 instances,2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 6); - //+40 = +26 for feed instances + 8 for process instance + 6 for second feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 40); - } - - @Test - public void testLineageForNoDateInFeedPath() throws Exception { - setupForNoDateInFeedPath(); - - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, - OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - // Verify if instance name has nominal time - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser( - RelationshipType.FEED_INSTANCE.getName()); - List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z", - "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z"); - Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected)); - - // +5 = 1 process, 2 inputs, 2 outputs - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 5); - //+34 = +26 for feed instances + 8 for process instance - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 34); - } - - @Test - public void testLineageForReplication() throws Exception { - setupForLineageReplication(); - - // Get the before vertices and edges - // +7 [primary, bcp cluster] = cluster, colo, tag, user - // +3 [input feed] = feed, tag, group - // +4 [output feed] = 1 feed + 1 tag + 2 groups - // +4 [process] = 1 process + 1 tag + 2 pipeline - // +3 = 1 process, 1 input, 1 output - long beforeVerticesCount = getVerticesCount(service.getGraph()); - - // +4 [cluster] = cluster to colo and tag [primary and bcp], - // +4 [input feed] = cluster, tag, group, user - // +5 [output feed] = cluster + user + Group + 2Tags - // +7 = user,tag,cluster, 1 input,1 output, 2 pipelines - // +19 = +6 for output feed instances + 7 for process instance + 6 for input feed instance - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", context, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); - - // No new vertex added after replication - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); - - // +1 for replicated-to edge to target cluster for each output feed instance - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1); - } - - @Test - public void testLineageForReplicationForNonGeneratedInstances() throws Exception { - cleanUp(); - service.init(); - addClusterAndFeedForReplication(inputFeeds); - // Get the vertices before running replication WF - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - - verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics"); - verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, - "jail://global:00/falcon/raw-click/bcp/20140101", context, - RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE); - - // +1 for the new instance vertex added - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1); - // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6); - } - - @Test - public void testLineageForRetention() throws Exception { - setupForLineageEviction(); - // Get the before vertices and edges - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, - EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"); - List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", - "clicks-feed/2014-01-01T00:00Z"); - List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"); - verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds); - String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR); - for (String feedInstanceDataPath : paths) { - verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context, - RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE); - } - - // No new vertices added - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); - // +2 for evicted-from edge from Feed Instance vertex to cluster - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); - } - - @Test - public void testLineageForRetentionWithNoFeedsEvicted() throws Exception { - cleanUp(); - service.init(); - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, - EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED), - WorkflowExecutionContext.Type.POST_PROCESSING); - - service.onSuccess(context); - - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - // No new vertices added - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount); - // No new edges added - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount); - } - - @Test (dependsOnMethods = "testOnAdd") - public void testOnChange() throws Exception { - // shutdown the graph and resurrect for testing - service.destroy(); - service.init(); - - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - // cannot modify cluster, adding a new cluster - anotherCluster = addClusterEntity("another-cluster", "east-coast", - "classification=another"); - verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY); - - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 3); // +3 = cluster, colo, tag - // +3 edges to user, colo and new tag - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 3); - } - - @Test(dependsOnMethods = "testOnChange") - public void testOnFeedEntityChange() throws Exception { - Feed oldFeed = inputFeeds.get(0); - Feed newFeed = EntityBuilderTestUtil.buildFeed(oldFeed.getName(), clusterEntity, - "classified-as=Secured,source=data-warehouse", "reporting"); - addStorage(newFeed, Storage.TYPE.FILESYSTEM, - "jail://global:00/falcon/impression-feed/20140101"); - - long beforeVerticesCount = 0; - long beforeEdgesCount = 0; - - try { - configStore.initiateUpdate(newFeed); - - beforeVerticesCount = getVerticesCount(service.getGraph()); - beforeEdgesCount = getEdgesCount(service.getGraph()); - - // add cluster - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(anotherCluster.getName()); - newFeed.getClusters().getClusters().add(feedCluster); - - configStore.update(EntityType.FEED, newFeed); - } finally { - configStore.cleanupUpdateInit(); - } - - verifyUpdatedEdges(newFeed); - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); //+2 = 2 new tags - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 2); // +2 = 1 new cluster, 1 new tag - } - - @Test - public void testLineageForTransactionFailure() throws Exception { - cleanUp(); - service.init(); - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY); - verifyClusterEntityEdges(); - Assert.assertEquals(getVerticesCount(service.getGraph()), 4); // +3 = cluster, colo, user, tag - Assert.assertEquals(getEdgesCount(service.getGraph()), 3); // +2 = cluster to colo, user and tag - - Feed feed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{clusterEntity}, null, null); - inputFeeds.add(feed); - outputFeeds.add(feed); - - try { - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inputFeeds, outputFeeds); - Assert.fail(); - } catch (FalconException e) { - Assert.assertEquals(getVerticesCount(service.getGraph()), 4); - Assert.assertEquals(getEdgesCount(service.getGraph()), 3); - } - - } - - private void verifyUpdatedEdges(Feed newFeed) { - Vertex feedVertex = getEntityVertex(newFeed.getName(), RelationshipType.FEED_ENTITY); - - // groups - Edge edge = feedVertex.getEdges(Direction.OUT, RelationshipLabel.GROUPS.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "reporting"); - - // tags - edge = feedVertex.getEdges(Direction.OUT, "classified-as").iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "Secured"); - edge = feedVertex.getEdges(Direction.OUT, "source").iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse"); - - // new cluster - List<String> actual = new ArrayList<>(); - for (Edge clusterEdge : feedVertex.getEdges(Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName())) { - actual.add(clusterEdge.getVertex(Direction.IN).<String>getProperty("name")); - } - Assert.assertTrue(actual.containsAll(Arrays.asList("primary-cluster", "another-cluster")), - "Actual does not contain expected: " + actual); - } - - @Test(dependsOnMethods = "testOnFeedEntityChange") - public void testOnProcessEntityChange() throws Exception { - long beforeVerticesCount = getVerticesCount(service.getGraph()); - long beforeEdgesCount = getEdgesCount(service.getGraph()); - - Process oldProcess = processEntity; - Process newProcess = EntityBuilderTestUtil.buildProcess(oldProcess.getName(), anotherCluster, - null, null); - EntityBuilderTestUtil.addProcessWorkflow(newProcess, GENERATE_WORKFLOW_NAME, "2.0.0"); - EntityBuilderTestUtil.addInput(newProcess, inputFeeds.get(0)); - - try { - configStore.initiateUpdate(newProcess); - configStore.update(EntityType.PROCESS, newProcess); - } finally { - configStore.cleanupUpdateInit(); - } - - verifyUpdatedEdges(newProcess); - Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 0); // +0, no net new - Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount - 6); // -6 = -2 outputs, -1 tag, - // -1 cluster, -2 pipelines - } - - @Test(dependsOnMethods = "testOnProcessEntityChange") - public void testAreSame() throws Exception { - - Inputs inputs1 = new Inputs(); - Inputs inputs2 = new Inputs(); - Outputs outputs1 = new Outputs(); - Outputs outputs2 = new Outputs(); - // return true when both are null - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - - Input i1 = new Input(); - i1.setName("input1"); - Input i2 = new Input(); - i2.setName("input2"); - Output o1 = new Output(); - o1.setName("output1"); - Output o2 = new Output(); - o2.setName("output2"); - - inputs1.getInputs().add(i1); - Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - outputs1.getOutputs().add(o1); - Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - - inputs2.getInputs().add(i1); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs1, inputs2)); - outputs2.getOutputs().add(o1); - Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs1, outputs2)); - } - - @Test - public void testLineageForJobCounter() throws Exception { - setupForJobCounters(); - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - debug(service.getGraph()); - GraphUtils.dump(service.getGraph()); - Graph graph = service.getGraph(); - - Vertex vertex = graph.getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next(); - Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L); - Assert.assertEquals(vertex.getProperty("COPY"), 30L); - Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L); - Assert.assertEquals(getVerticesCount(service.getGraph()), 9); - Assert.assertEquals(getEdgesCount(service.getGraph()), 14); - verifyLineageGraphForJobCounters(context); - } - - private void verifyUpdatedEdges(Process newProcess) { - Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY); - - // cluster - Edge edge = processVertex.getEdges(Direction.OUT, - RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), anotherCluster.getName()); - - // inputs - edge = processVertex.getEdges(Direction.IN, RelationshipLabel.FEED_PROCESS_EDGE.getName()).iterator().next(); - Assert.assertEquals(edge.getVertex(Direction.OUT).getProperty("name"), - newProcess.getInputs().getInputs().get(0).getFeed()); - - // outputs - for (Edge e : processVertex.getEdges(Direction.OUT, RelationshipLabel.PROCESS_FEED_EDGE.getName())) { - Assert.fail("there should not be any edges to output feeds" + e); - } - } - - public static void debug(final Graph graph) { - System.out.println("*****Vertices of " + graph); - for (Vertex vertex : graph.getVertices()) { - System.out.println(GraphUtils.vertexString(vertex)); - } - - System.out.println("*****Edges of " + graph); - for (Edge edge : graph.getEdges()) { - System.out.println(GraphUtils.edgeString(edge)); - } - } - - private Cluster addClusterEntity(String name, String colo, String tags) throws Exception { - Cluster cluster = EntityBuilderTestUtil.buildCluster(name, colo, tags); - configStore.publish(EntityType.CLUSTER, cluster); - return cluster; - } - - private Feed addFeedEntity(String feedName, Cluster cluster, String tags, String groups, - Storage.TYPE storageType, String uriTemplate) throws Exception { - return addFeedEntity(feedName, new Cluster[]{cluster}, tags, groups, storageType, uriTemplate); - } - - private Feed addFeedEntity(String feedName, Cluster[] clusters, String tags, String groups, - Storage.TYPE storageType, String uriTemplate) throws Exception { - Feed feed = EntityBuilderTestUtil.buildFeed(feedName, clusters, - tags, groups); - addStorage(feed, storageType, uriTemplate); - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - if (feedCluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) { - feedCluster.setType(ClusterType.TARGET); - } - } - configStore.publish(EntityType.FEED, feed); - return feed; - } - - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public Process addProcessEntity(String processName, Cluster cluster, - String tags, String pipelineTags, String workflowName, - String version, List<Feed> inFeeds, List<Feed> outFeeds) throws Exception { - Process process = EntityBuilderTestUtil.buildProcess(processName, cluster, - tags, pipelineTags); - EntityBuilderTestUtil.addProcessWorkflow(process, workflowName, version); - - for (Feed inputFeed : inFeeds) { - EntityBuilderTestUtil.addInput(process, inputFeed); - } - - for (Feed outputFeed : outFeeds) { - EntityBuilderTestUtil.addOutput(process, outputFeed); - } - - configStore.publish(EntityType.PROCESS, process); - return process; - } - //RESUME CHECKSTYLE CHECK ParameterNumberCheck - - private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) { - if (storageType == Storage.TYPE.FILESYSTEM) { - Locations locations = new Locations(); - feed.setLocations(locations); - - Location location = new Location(); - location.setType(LocationType.DATA); - location.setPath(uriTemplate); - feed.getLocations().getLocations().add(location); - } else { - CatalogTable table = new CatalogTable(); - table.setUri(uriTemplate); - feed.setTable(table); - } - } - - private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed, - Storage.TYPE storageType, String uriTemplate) { - if (storageType == Storage.TYPE.FILESYSTEM) { - Locations locations = new Locations(); - feed.setLocations(locations); - - Location location = new Location(); - location.setType(LocationType.DATA); - location.setPath(uriTemplate); - cluster.setLocations(new Locations()); - cluster.getLocations().getLocations().add(location); - } else { - CatalogTable table = new CatalogTable(); - table.setUri(uriTemplate); - cluster.setTable(table); - } - } - - private void verifyEntityWasAddedToGraph(String entityName, RelationshipType entityType) { - Vertex entityVertex = getEntityVertex(entityName, entityType); - Assert.assertNotNull(entityVertex); - verifyEntityProperties(entityVertex, entityName, entityType); - } - - private void verifyEntityProperties(Vertex entityVertex, String entityName, RelationshipType entityType) { - Assert.assertEquals(entityName, entityVertex.getProperty(RelationshipProperty.NAME.getName())); - Assert.assertEquals(entityType.getName(), entityVertex.getProperty(RelationshipProperty.TYPE.getName())); - Assert.assertNotNull(entityVertex.getProperty(RelationshipProperty.TIMESTAMP.getName())); - } - - private void verifyClusterEntityEdges() { - Vertex clusterVertex = getEntityVertex(CLUSTER_ENTITY_NAME, - RelationshipType.CLUSTER_ENTITY); - - // verify edge to user vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - // verify edge to colo vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(), - COLO_NAME, RelationshipType.COLO.getName()); - // verify edge to tags vertex - verifyVertexForEdge(clusterVertex, Direction.OUT, "classification", - "production", RelationshipType.TAGS.getName()); - } - - private void verifyFeedEntityEdges(String feedName, String tag, String group) { - Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY); - - // verify edge to cluster vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(), - CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName()); - // verify edge to user vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - - // verify edge to tags vertex - verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as", - tag, RelationshipType.TAGS.getName()); - // verify edge to group vertex - verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(), - group, RelationshipType.GROUPS.getName()); - } - - private void verifyProcessEntityEdges() { - Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME, - RelationshipType.PROCESS_ENTITY); - - // verify edge to cluster vertex - verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName(), - CLUSTER_ENTITY_NAME, RelationshipType.CLUSTER_ENTITY.getName()); - // verify edge to user vertex - verifyVertexForEdge(processVertex, Direction.OUT, RelationshipLabel.USER.getName(), - FALCON_USER, RelationshipType.USER.getName()); - // verify edge to tags vertex - verifyVertexForEdge(processVertex, Direction.OUT, "classified-as", - "Critical", RelationshipType.TAGS.getName()); - - // verify edges to inputs - List<String> actual = new ArrayList<>(); - for (Edge edge : processVertex.getEdges(Direction.IN, - RelationshipLabel.FEED_PROCESS_EDGE.getName())) { - Vertex outVertex = edge.getVertex(Direction.OUT); - Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), - outVertex.getProperty(RelationshipProperty.TYPE.getName())); - actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName())); - } - - Assert.assertTrue(actual.containsAll(Arrays.asList("impression-feed", "clicks-feed")), - "Actual does not contain expected: " + actual); - - actual.clear(); - // verify edges to outputs - for (Edge edge : processVertex.getEdges(Direction.OUT, - RelationshipLabel.PROCESS_FEED_EDGE.getName())) { - Vertex outVertex = edge.getVertex(Direction.IN); - Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), - outVertex.getProperty(RelationshipProperty.TYPE.getName())); - actual.add(outVertex.<String>getProperty(RelationshipProperty.NAME.getName())); - } - Assert.assertTrue(actual.containsAll(Arrays.asList("imp-click-join1", "imp-click-join2")), - "Actual does not contain expected: " + actual); - } - - private Vertex getEntityVertex(String entityName, RelationshipType entityType) { - GraphQuery entityQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), entityName) - .has(RelationshipProperty.TYPE.getName(), entityType.getName()); - Iterator<Vertex> iterator = entityQuery.vertices().iterator(); - Assert.assertTrue(iterator.hasNext()); - - Vertex entityVertex = iterator.next(); - Assert.assertNotNull(entityVertex); - - return entityVertex; - } - - private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label, - String expectedName, String expectedType) { - boolean found = false; - for (Edge edge : fromVertex.getEdges(direction, label)) { - found = true; - Vertex outVertex = edge.getVertex(Direction.IN); - Assert.assertEquals( - outVertex.getProperty(RelationshipProperty.NAME.getName()), expectedName); - Assert.assertEquals( - outVertex.getProperty(RelationshipProperty.TYPE.getName()), expectedType); - } - Assert.assertFalse((!found), "Edge not found"); - } - - private void verifyEntityGraph(RelationshipType feedType, String classification) { - // feeds owned by a user - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName()); - Assert.assertEquals(feedNamesOwnedByUser, - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", - "imp-click-join2") - ); - - // feeds classified as secure - verifyFeedsClassifiedAsSecure(feedType.getName(), - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2")); - - // feeds owned by a user and classified as secure - verifyFeedsOwnedByUserAndClassification(feedType.getName(), classification, - Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2")); - } - - private List<String> getFeedsOwnedByAUser(String feedType) { - GraphQuery userQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), FALCON_USER) - .has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName()); - - List<String> feedNames = new ArrayList<>(); - for (Vertex userVertex : userQuery.vertices()) { - for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - System.out.println(FALCON_USER + " owns -> " + GraphUtils.vertexString(feed)); - feedNames.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - } - } - } - - return feedNames; - } - - private void verifyFeedsClassifiedAsSecure(String feedType, List<String> expected) { - GraphQuery classQuery = getQuery() - .has(RelationshipProperty.NAME.getName(), "Secure") - .has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName()); - - List<String> actual = new ArrayList<>(); - for (Vertex feedVertex : classQuery.vertices()) { - for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - System.out.println(" Secure classification -> " + GraphUtils.vertexString(feed)); - actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - } - } - } - - Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual); - } - - private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification, - List<String> expected) { - List<String> actual = new ArrayList<>(); - Vertex userVertex = getEntityVertex(FALCON_USER, RelationshipType.USER); - for (Vertex feed : userVertex.getVertices(Direction.IN, RelationshipLabel.USER.getName())) { - if (feed.getProperty(RelationshipProperty.TYPE.getName()).equals(feedType)) { - for (Vertex classVertex : feed.getVertices(Direction.OUT, "classified-as")) { - if (classVertex.getProperty(RelationshipProperty.NAME.getName()) - .equals(classification)) { - actual.add(feed.<String>getProperty(RelationshipProperty.NAME.getName())); - System.out.println(classification + " feed owned by falcon-user -> " - + GraphUtils.vertexString(feed)); - } - } - } - } - Assert.assertTrue(actual.containsAll(expected), - "Actual does not contain expected: " + actual); - } - - public long getVerticesCount(final Graph graph) { - long count = 0; - for (Vertex ignored : graph.getVertices()) { - count++; - } - - return count; - } - - public long getEdgesCount(final Graph graph) { - long count = 0; - for (Edge ignored : graph.getEdges()) { - count++; - } - - return count; - } - - private void verifyLineageGraph(String feedType) { - List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", - "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z", - "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"); - verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds); - } - - private void verifyLineageGraph(String feedType, List<String> expectedFeeds, - List<String> secureFeeds, List<String> ownedAndSecureFeeds) { - // feeds owned by a user - List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType); - Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds)); - - Graph graph = service.getGraph(); - - Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/2014-01-01T00:00Z").iterator(); - Assert.assertTrue(vertices.hasNext()); - Vertex feedInstanceVertex = vertices.next(); - Assert.assertEquals(feedInstanceVertex.getProperty(RelationshipProperty.TYPE.getName()), - RelationshipType.FEED_INSTANCE.getName()); - - Object vertexId = feedInstanceVertex.getId(); - Vertex vertexById = graph.getVertex(vertexId); - Assert.assertEquals(vertexById, feedInstanceVertex); - - // feeds classified as secure - verifyFeedsClassifiedAsSecure(feedType, secureFeeds); - - // feeds owned by a user and classified as secure - verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds); - } - - private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath, - WorkflowExecutionContext context, - RelationshipLabel edgeLabel) throws Exception { - String feedInstanceName = InstanceRelationshipGraphBuilder.getFeedInstanceName(feedName - , context.getClusterName(), feedInstanceDataPath, context.getNominalTimeAsISO8601()); - Vertex feedVertex = getEntityVertex(feedInstanceName, RelationshipType.FEED_INSTANCE); - - Edge edge = feedVertex.getEdges(Direction.OUT, edgeLabel.getName()) - .iterator().next(); - Assert.assertNotNull(edge); - Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName()) - , context.getTimeStampAsISO8601()); - - Vertex clusterVertex = edge.getVertex(Direction.IN); - Assert.assertEquals(clusterVertex.getProperty(RelationshipProperty.NAME.getName()), context.getClusterName()); - } - - private void verifyLineageGraphForJobCounters(WorkflowExecutionContext context) throws Exception { - Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME, - RelationshipType.PROCESS_ENTITY); - Assert.assertEquals(processVertex.getProperty("name"), PROCESS_ENTITY_NAME); - Assert.assertTrue(context.getCounters().length()>0); - } - - private static String[] getTestMessageArgs(EntityOperations operation, String wfName, String outputFeedNames, - String feedInstancePaths, String falconInputPaths, - String falconInputFeeds) { - String cluster; - if (EntityOperations.REPLICATE == operation) { - cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME; - } else { - cluster = CLUSTER_ENTITY_NAME; - } - - return new String[]{ - "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster, - "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"), - "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME, - "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME, - "-" + WorkflowExecutionArgs.OPERATION.getName(), operation.toString(), - - "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), - (falconInputFeeds != null ? falconInputFeeds : INPUT_FEED_NAMES), - "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), - (falconInputPaths != null ? falconInputPaths : INPUT_INSTANCE_PATHS), - - "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), - (outputFeedNames != null ? outputFeedNames : OUTPUT_FEED_NAMES), - "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), - (feedInstancePaths != null ? feedInstancePaths : OUTPUT_INSTANCE_PATHS), - - "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00", - "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER, - "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1", - "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED", - "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME, - - "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie", - "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id", - "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), wfName, - "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION, - "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(), - - "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER, - "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true", - "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER, - "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true", - "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000", - - "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR, - }; - } - - private void setupForJobCounters() throws Exception { - cleanUp(); - service.init(); - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - - createJobCountersFileForTest(); - // Add process - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - } - - private void createJobCountersFileForTest() throws Exception { - Path counterFile = new Path(LOGS_DIR, "counter.txt"); - OutputStream out = null; - try { - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - new Path(LOGS_DIR).toUri()); - out = fs.create(counterFile); - out.write(COUNTERS.getBytes()); - out.flush(); - } finally { - out.close(); - } - } - - private void setup() throws Exception { - cleanUp(); - service.init(); - - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - - addFeedsAndProcess(clusterEntity); - } - - private void addFeedsAndProcess(Cluster cluster) throws Exception { - // Add input and output feeds - Feed impressionsFeed = addFeedEntity("impression-feed", cluster, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}"); - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - inFeeds.add(impressionsFeed); - Feed clicksFeed = addFeedEntity("clicks-feed", cluster, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"); - inFeeds.add(clicksFeed); - Feed join1Feed = addFeedEntity("imp-click-join1", cluster, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join1Feed); - Feed join2Feed = addFeedEntity("imp-click-join2", cluster, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join2Feed); - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - } - - private void setupForLineageReplication() throws Exception { - cleanUp(); - service.init(); - - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - - addClusterAndFeedForReplication(inFeeds); - - // Add output feed - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"); - outFeeds.add(join1Feed); - - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - - // GENERATE WF should have run before this to create all instance related vertices - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1", - "jail://global:00/falcon/imp-click-join1/20140101", - "jail://global:00/falcon/raw-click/primary/20140101", - REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - } - - private void addClusterAndFeedForReplication(List<Feed> inFeeds) throws Exception { - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - // Add backup cluster - Cluster bcpCluster = addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp"); - - Cluster[] clusters = {clusterEntity, bcpCluster}; - - // Add feed - Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}"); - // Add uri template for each cluster - for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : rawFeed.getClusters().getClusters()) { - if (feedCluster.getName().equals(CLUSTER_ENTITY_NAME)) { - addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}"); - } else { - addStorage(feedCluster, rawFeed, Storage.TYPE.FILESYSTEM, - "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}"); - } - } - - // update config store - try { - configStore.initiateUpdate(rawFeed); - configStore.update(EntityType.FEED, rawFeed); - } finally { - configStore.cleanupUpdateInit(); - } - inFeeds.add(rawFeed); - } - - private void setupForLineageEviction() throws Exception { - setup(); - - // GENERATE WF should have run before this to create all instance related vertices - WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs( - EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, - "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null), - WorkflowExecutionContext.Type.POST_PROCESSING); - service.onSuccess(context); - } - - private void setupForNoDateInFeedPath() throws Exception { - cleanUp(); - service.init(); - - // Add cluster - clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME, - "classification=production"); - List<Feed> inFeeds = new ArrayList<>(); - List<Feed> outFeeds = new ArrayList<>(); - // Add input and output feeds - Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity, - "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/impression-feed"); - inFeeds.add(impressionsFeed); - Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity, - "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, - "/falcon/clicks-feed"); - inFeeds.add(clicksFeed); - Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity, - "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join1"); - outFeeds.add(join1Feed); - Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity, - "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, - "/falcon/imp-click-join2"); - outFeeds.add(join2Feed); - processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity, - "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, - WORKFLOW_VERSION, inFeeds, outFeeds); - - } - - private void cleanUp() throws Exception { - cleanupGraphStore(service.getGraph()); - cleanupConfigurationStore(configStore); - service.destroy(); - } - - private void cleanupGraphStore(Graph graph) { - for (Edge edge : graph.getEdges()) { - graph.removeEdge(edge); - } - - for (Vertex vertex : graph.getVertices()) { - graph.removeVertex(vertex); - } - - graph.shutdown(); - } - - private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception { - for (EntityType type : EntityType.values()) { - Collection<String> entities = store.getEntities(type); - for (String entity : entities) { - store.remove(type, entity); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java b/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java deleted file mode 100644 index 0f2ee7b..0000000 --- a/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.retention; - -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * Unit test for EvictedInstanceSerDe. - */ -public class EvictedInstanceSerDeTest { - - private EmbeddedCluster cluster; - private FileSystem fs; - private Path csvFilePath; - private StringBuffer evictedInstancePaths = new StringBuffer( - "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010") - .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR) - .append("thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011"); - - @BeforeClass - public void start() throws Exception { - cluster = EmbeddedCluster.newCluster("test"); - String hdfsUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY); - - fs = FileSystem.get(cluster.getConf()); - csvFilePath = new Path(hdfsUrl + "/falcon/staging/feed/instancePaths-2014-10-01-01-00.csv"); - } - - @AfterClass - public void close() throws Exception { - cluster.shutdown(); - } - - @Test - public void testSerializeEvictedInstancePathsForNoEviction() throws Exception { - EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, new StringBuffer()); - - Assert.assertEquals(readLogFile(csvFilePath), - EvictedInstanceSerDe.INSTANCEPATH_PREFIX); - } - - @Test - public void testSerializeEvictedInstancePathsWithEviction() throws Exception { - EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, evictedInstancePaths); - Assert.assertEquals(readLogFile(csvFilePath), evictedInstancePaths.toString()); - } - - @Test(dependsOnMethods = "testSerializeEvictedInstancePathsForNoEviction") - public void testDeserializeEvictedInstancePathsForNoEviction() throws Exception { - String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath); - Assert.assertEquals(instancePaths.length, 0); - } - - @Test(dependsOnMethods = "testSerializeEvictedInstancePathsWithEviction") - public void testDeserializeEvictedInstancePathsWithEviction() throws Exception { - String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath); - Assert.assertEquals(instancePaths.length, 2); - Assert.assertTrue(instancePaths[0].equals( - "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010")); - Assert.assertTrue(instancePaths[1].equals( - "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011")); - - } - - private String readLogFile(Path logFile) throws IOException { - ByteArrayOutputStream writer = new ByteArrayOutputStream(); - InputStream date = fs.open(logFile); - IOUtils.copyBytes(date, writer, 4096, true); - return writer.toString(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java b/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java deleted file mode 100644 index 7979fe0..0000000 --- a/common/src/test/java/org/apache/falcon/security/AuthenticationInitializationServiceTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.falcon.FalconException; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; -import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.io.File; - - -/** - * Unit test for AuthenticationInitializationService that employs mocks. - */ -public class AuthenticationInitializationServiceTest { - - private AuthenticationInitializationService authenticationService; - - @Mock - private UserGroupInformation mockLoginUser; - - @BeforeClass - public void setUp() { - MockitoAnnotations.initMocks(this); - - authenticationService = new AuthenticationInitializationService(); - } - - @Test - public void testGetName() { - Assert.assertEquals("Authentication initialization service", - authenticationService.getName()); - } - - @Test - public void testInitForSimpleAuthenticationMethod() { - try { - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, - PseudoAuthenticationHandler.TYPE); - authenticationService.init(); - - UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); - Assert.assertFalse(loginUser.isFromKeytab()); - Assert.assertEquals(loginUser.getAuthenticationMethod().name().toLowerCase(), - PseudoAuthenticationHandler.TYPE); - Assert.assertEquals(System.getProperty("user.name"), loginUser.getUserName()); - } catch (Exception e) { - Assert.fail("AuthenticationInitializationService init failed.", e); - } - } - - @Test - public void testKerberosAuthenticationWithKeytabFileDoesNotExist() { - try { - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, - KerberosAuthenticationHandler.TYPE); - StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/blah/blah"); - authenticationService.init(); - Assert.fail("The keytab file does not exist! must have been thrown."); - } catch (Exception e) { - Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); - } - } - - @Test - public void testKerberosAuthenticationWithKeytabFileIsADirectory() { - try { - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, - KerberosAuthenticationHandler.TYPE); - StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_KEYTAB, "/tmp/"); - authenticationService.init(); - Assert.fail("The keytab file cannot be a directory! must have been thrown."); - } catch (Exception e) { - Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); - } - } - - @Test - public void testKerberosAuthenticationWithKeytabFileNotReadable() { - File tempFile = new File(".keytabFile"); - try { - assert tempFile.createNewFile(); - assert tempFile.setReadable(false); - - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, - KerberosAuthenticationHandler.TYPE); - StartupProperties.get().setProperty( - AuthenticationInitializationService.KERBEROS_KEYTAB, tempFile.toString()); - authenticationService.init(); - Assert.fail("The keytab file is not readable! must have been thrown."); - } catch (Exception e) { - Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); - } finally { - assert tempFile.delete(); - } - } - - @Test (enabled = false) - public void testInitForKerberosAuthenticationMethod() throws FalconException { - Mockito.when(mockLoginUser.getAuthenticationMethod()) - .thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS); - Mockito.when(mockLoginUser.getUserName()).thenReturn(FalconTestUtil.TEST_USER_1); - Mockito.when(mockLoginUser.isFromKeytab()).thenReturn(Boolean.TRUE); - - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, - KerberosAuthenticationHandler.TYPE); - StartupProperties.get().setProperty( - AuthenticationInitializationService.KERBEROS_KEYTAB, "falcon.kerberos.keytab"); - StartupProperties.get().setProperty(AuthenticationInitializationService.KERBEROS_PRINCIPAL, - FalconTestUtil.TEST_USER_1); - - authenticationService.init(); - - Assert.assertTrue(mockLoginUser.isFromKeytab()); - Assert.assertEquals(mockLoginUser.getAuthenticationMethod().name(), - KerberosAuthenticationHandler.TYPE); - Assert.assertEquals(FalconTestUtil.TEST_USER_1, mockLoginUser.getUserName()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java b/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java deleted file mode 100644 index 5cc6c70..0000000 --- a/common/src/test/java/org/apache/falcon/security/CurrentUserTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.security; - -import org.apache.falcon.cluster.util.EntityBuilderTestUtil; -import org.apache.falcon.service.GroupsService; -import org.apache.falcon.service.ProxyUserService; -import org.apache.falcon.service.Services; -import org.apache.falcon.util.RuntimeProperties; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -/** - * Test for current user's thread safety. - */ -public class CurrentUserTest { - private ProxyUserService proxyUserService; - private GroupsService groupsService; - - @BeforeClass - public void setUp() throws Exception { - Services.get().register(new ProxyUserService()); - Services.get().register(new GroupsService()); - groupsService = Services.get().getService(GroupsService.SERVICE_NAME); - proxyUserService = Services.get().getService(ProxyUserService.SERVICE_NAME); - groupsService.init(); - - RuntimeProperties.get().setProperty("falcon.service.ProxyUserService.proxyuser.foo.hosts", "*"); - RuntimeProperties.get().setProperty("falcon.service.ProxyUserService.proxyuser.foo.groups", "*"); - proxyUserService.init(); - } - - @AfterClass - public void tearDown() throws Exception { - proxyUserService.destroy(); - groupsService.destroy(); - Services.get().reset(); - } - - @AfterMethod - public void cleanUp() { - CurrentUser.clear(); - } - - @Test(threadPoolSize = 10, invocationCount = 10, timeOut = 10000) - public void testGetUser() throws Exception { - String id = Long.toString(System.nanoTime()); - CurrentUser.authenticate(id); - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), id); - Assert.assertEquals(CurrentUser.getUser(), id); - } - - @Test (expectedExceptions = IllegalStateException.class) - public void testAuthenticateBadUser() throws Exception { - CurrentUser.authenticate(""); - } - - @Test (expectedExceptions = IllegalStateException.class) - public void testGetAuthenticatedUserInvalid() throws Exception { - CurrentUser.getAuthenticatedUser(); - } - - @Test (expectedExceptions = IllegalStateException.class) - public void testGetUserInvalid() throws Exception { - CurrentUser.getUser(); - } - - @Test (expectedExceptions = IllegalStateException.class) - public void testProxyBadUser() throws Exception { - CurrentUser.authenticate(FalconTestUtil.TEST_USER_1); - CurrentUser.proxy("", ""); - } - - @Test (expectedExceptions = IllegalStateException.class) - public void testProxyWithNoAuth() throws Exception { - CurrentUser.proxy(FalconTestUtil.TEST_USER_1, "falcon"); - } - - @Test - public void testGetProxyUserForAuthenticatedUser() throws Exception { - CurrentUser.authenticate("proxy"); - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), "proxy"); - } - - @Test - public void testProxy() throws Exception { - CurrentUser.authenticate("real"); - - CurrentUser.proxy(EntityBuilderTestUtil.USER, "users"); - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER); - - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "real"); - Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER); - } - - @Test - public void testProxySameUser() throws Exception { - CurrentUser.authenticate(FalconTestUtil.TEST_USER_1); - - CurrentUser.proxy(FalconTestUtil.TEST_USER_1, "users"); - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), FalconTestUtil.TEST_USER_1); - - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), FalconTestUtil.TEST_USER_1); - Assert.assertEquals(CurrentUser.getUser(), FalconTestUtil.TEST_USER_1); - } - - @Test - public void testSuperUser() throws Exception { - CurrentUser.authenticate(EntityBuilderTestUtil.USER); - CurrentUser.proxy("proxy", "users"); - - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), "proxy"); - - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), EntityBuilderTestUtil.USER); - Assert.assertEquals(CurrentUser.getUser(), "proxy"); - } - - @Test(expectedExceptions = IllegalStateException.class) - public void testProxyDoAsUserWithNoAuth() throws Exception { - CurrentUser.proxyDoAsUser("falcon", "localhost"); - } - - @Test - public void testProxyDoAsUser() throws Exception { - CurrentUser.authenticate("foo"); - - CurrentUser.proxyDoAsUser(EntityBuilderTestUtil.USER, "localhost"); - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), EntityBuilderTestUtil.USER); - - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "foo"); - Assert.assertEquals(CurrentUser.getUser(), EntityBuilderTestUtil.USER); - } - - @Test - public void testProxyDoAsSameUser() throws Exception { - CurrentUser.authenticate("foo"); - - CurrentUser.proxyDoAsUser("foo", "localhost"); - UserGroupInformation proxyUgi = CurrentUser.getProxyUGI(); - Assert.assertNotNull(proxyUgi); - Assert.assertEquals(proxyUgi.getUserName(), "foo"); - - Assert.assertEquals(CurrentUser.getAuthenticatedUser(), "foo"); - Assert.assertEquals(CurrentUser.getUser(), "foo"); - } -}
