http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java b/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java deleted file mode 100644 index 0729f15..0000000 --- a/common/src/test/java/org/apache/falcon/entity/ProcessHelperTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; -import java.util.Set; -import java.util.TimeZone; - - -/** - * Tests for ProcessHelper methods. - */ -public class ProcessHelperTest extends AbstractTestBase { - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - private ConfigurationStore store; - - @BeforeClass - public void init() throws Exception { - initConfigStore(); - } - - @BeforeMethod - public void setUp() throws Exception { - cleanupStore(); - store = ConfigurationStore.get(); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testBeforeStartInstance() throws FalconException, ParseException { - // create a process with input feeds - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - - // find the input Feed instances time - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Inputs inputs = new Inputs(); - Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); - inputs.getInputs().add(input); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Date processInstanceDate = getDate("2012-02-28 10:27 UTC"); - ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testEqualsToEndInstance() throws FalconException, ParseException { - // create a process with input feeds - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - - // find the input Feed instances time - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Inputs inputs = new Inputs(); - Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); - inputs.getInputs().add(input); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Date processInstanceDate = getDate("2012-02-28 10:47 UTC"); - ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testOutOfSyncInstance() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Inputs inputs = new Inputs(); - Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); - inputs.getInputs().add(input); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - Date processInstanceDate = getDate("2012-02-28 10:40 UTC"); - ProcessHelper.getInputFeedInstances(process, processInstanceDate, cluster, false); - } - - @Test - public void testProcessWithNoDependencies() throws Exception { - Cluster cluster = publishCluster(); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - store.publish(EntityType.PROCESS, process); - Date processInstanceDate = getDate("2012-02-28 10:37 UTC"); - Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, - processInstanceDate, cluster, false); - Assert.assertTrue(inputFeedInstances.isEmpty()); - Set<SchedulableEntityInstance> res = ProcessHelper.getOutputFeedInstances(process, processInstanceDate, - cluster); - Assert.assertTrue(res.isEmpty()); - } - - @Test - public void testGetInputFeedInstances() throws FalconException, ParseException { - // create a process with input feeds - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - - // find the input Feed instances time - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC"); - Inputs inputs = new Inputs(); - Input input = getInput("inputFeed", feed.getName(), "now(0,-20)", "now(0,0)", false); - inputs.getInputs().add(input); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Date processInstanceDate = getDate("2012-02-28 10:37 UTC"); - Set<SchedulableEntityInstance> inputFeedInstances = ProcessHelper.getInputFeedInstances(process, - processInstanceDate, cluster, false); - Assert.assertEquals(inputFeedInstances.size(), 5); - - Set<SchedulableEntityInstance> expectedInputFeedInstances = new HashSet<>(); - String[] inputInstances = { "2012-02-28 10:15 UTC", "2012-02-28 10:20 UTC", "2012-02-28 10:25 UTC", - "2012-02-28 10:30 UTC", "2012-02-28 10:35 UTC", }; - for (String d : inputInstances) { - SchedulableEntityInstance i = new SchedulableEntityInstance(feed.getName(), cluster.getName(), - getDate(d), EntityType.FEED); - i.setTags(SchedulableEntityInstance.INPUT); - expectedInputFeedInstances.add(i); - } - Assert.assertTrue(inputFeedInstances.equals(expectedInputFeedInstances)); - } - - @Test - public void testGetOutputFeedInstances() throws FalconException, ParseException { - // create a process with input feeds - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2012-02-27 11:00 UTC", "2016-02-28 11:00 UTC"); - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Outputs outputs = new Outputs(); - outputs.getOutputs().add(getOutput("outputFeed", feed.getName(), "now(0,0)")); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = ProcessHelper.getOutputFeedInstances(process, - getDate("2012-02-28 10:00 UTC"), cluster); - - Set<SchedulableEntityInstance> expected = new HashSet<>(); - SchedulableEntityInstance ins = new SchedulableEntityInstance(feed.getName(), cluster.getName(), - getDate("2012-02-27 11:00 UTC"), EntityType.FEED); - ins.setTags(SchedulableEntityInstance.OUTPUT); - expected.add(ins); - - Assert.assertEquals(result, expected); - - } - - private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws - ParseException { - - org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity(); - validity.setStart(getDate(start)); - validity.setEnd(getDate(end)); - return validity; - } - - private Date getDate(String dateString) throws ParseException { - return new SimpleDateFormat("yyyy-MM-dd HH:mm Z").parse(dateString); - } - - private org.apache.falcon.entity.v0.feed.Validity getFeedValidity(String start, String end) throws ParseException { - org.apache.falcon.entity.v0.feed.Validity validity = new org.apache.falcon.entity.v0.feed.Validity(); - validity.setStart(getDate(start)); - validity.setEnd(getDate(end)); - return validity; - } - - private Input getInput(String name, String feedName, String start, String end, boolean isOptional) { - Input inFeed = new Input(); - inFeed.setName(name); - inFeed.setFeed(feedName); - inFeed.setStart(start); - inFeed.setEnd(end); - inFeed.setOptional(isOptional); - return inFeed; - } - - private Output getOutput(String name, String feedName, String instance) { - Output output = new Output(); - output.setInstance(instance); - output.setFeed(feedName); - output.setName(name); - return output; - } - - private Cluster publishCluster() throws FalconException { - Cluster cluster = new Cluster(); - cluster.setName("feedCluster"); - cluster.setColo("colo"); - store.publish(EntityType.CLUSTER, cluster); - return cluster; - - } - - private Feed publishFeed(Cluster cluster, String frequency, String start, String end) - throws FalconException, ParseException { - - Feed feed = new Feed(); - feed.setName("feed"); - Frequency f = new Frequency(frequency); - feed.setFrequency(f); - feed.setTimezone(UTC); - Clusters fClusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); - fCluster.setName(cluster.getName()); - fCluster.setValidity(getFeedValidity(start, end)); - fClusters.getClusters().add(fCluster); - feed.setClusters(fClusters); - store.publish(EntityType.FEED, feed); - - return feed; - } - - private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException { - Process process = new Process(); - process.setName("process"); - process.setTimezone(UTC); - org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters(); - org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster(); - pCluster.setName(cluster.getName()); - org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end); - pCluster.setValidity(validity); - pClusters.getClusters().add(pCluster); - process.setClusters(pClusters); - Frequency f = new Frequency(frequency); - process.setFrequency(f); - return process; - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java b/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java deleted file mode 100644 index eb0127d..0000000 --- a/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.entity.parser.ClusterEntityParser; -import org.apache.falcon.entity.parser.EntityParserFactory; -import org.apache.falcon.entity.parser.FeedEntityParser; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.InputStream; -import java.util.List; - -/** - * Test for storage factory methods in feed helper. - */ -public class StorageFactoryTest { - - private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml"; - - private static final String FS_FEED_UNIFORM = "/config/feed/feed-0.1.xml"; - private static final String FS_FEED_OVERRIDE = "/config/feed/feed-0.2.xml"; - - private static final String TABLE_FEED_UNIFORM = "/config/feed/hive-table-feed.xml"; - private static final String TABLE_FEED_OVERRIDE = "/config/feed/hive-table-feed-out.xml"; - - private static final String OVERRIDE_TBL_LOC = "/testCluster/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"; - - private final ClusterEntityParser clusterParser = - (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); - private final FeedEntityParser feedParser = - (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED); - - private Cluster clusterEntity; - private Feed fsFeedWithUniformStorage; - private Feed fsFeedWithOverriddenStorage; - private Feed tableFeedWithUniformStorage; - private Feed tableFeedWithOverriddenStorage; - - @BeforeClass - public void setup() throws Exception { - InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); - clusterEntity = clusterParser.parse(stream); - stream.close(); - Interface registry = ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY); - registry.setEndpoint("thrift://localhost:9083"); - ConfigurationStore.get().publish(EntityType.CLUSTER, clusterEntity); - - stream = this.getClass().getResourceAsStream(FS_FEED_UNIFORM); - fsFeedWithUniformStorage = feedParser.parse(stream); - stream.close(); - - stream = this.getClass().getResourceAsStream(FS_FEED_OVERRIDE); - fsFeedWithOverriddenStorage = feedParser.parse(stream); - stream.close(); - - stream = this.getClass().getResourceAsStream(TABLE_FEED_UNIFORM); - tableFeedWithUniformStorage = feedParser.parse(stream); - stream.close(); - - stream = this.getClass().getResourceAsStream(TABLE_FEED_OVERRIDE); - tableFeedWithOverriddenStorage = feedParser.parse(stream); - stream.close(); - } - - @AfterClass - public void tearDown() throws Exception { - ConfigurationStore.get().remove(EntityType.CLUSTER, clusterEntity.getName()); - } - - @DataProvider (name = "locationsDataProvider") - private Object[][] createLocationsDataProvider() { - return new Object[][] { - {fsFeedWithUniformStorage, "/projects/falcon/clicks"}, - {fsFeedWithOverriddenStorage, "/testCluster/projects/falcon/clicks"}, - }; - } - - @Test (dataProvider = "locationsDataProvider") - public void testGetLocations(Feed feed, String dataPath) { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - FeedHelper.getCluster(feed, clusterEntity.getName()); - List<Location> locations = FeedHelper.getLocations(feedCluster, feed); - for (Location location : locations) { - if (location.getType() == LocationType.DATA) { - Assert.assertEquals(location.getPath(), dataPath); - } - } - } - - @DataProvider (name = "tableDataProvider") - private Object[][] createTableDataProvider() { - return new Object[][] { - {tableFeedWithUniformStorage, "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"}, - {tableFeedWithOverriddenStorage, "catalog:testCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"}, - }; - } - - @Test (dataProvider = "tableDataProvider") - public void testGetTable(Feed feed, String dataPath) { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - FeedHelper.getCluster(feed, clusterEntity.getName()); - CatalogTable table = FeedHelper.getTable(feedCluster, feed); - Assert.assertEquals(table.getUri(), dataPath); - } - - private static final String UNIFORM_TABLE = "${hcatNode}/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"; - private static final String OVERRIDETBL = "${hcatNode}/default/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"; - - - @DataProvider (name = "uniformFeedStorageDataProvider") - private Object[][] createUniformFeedStorageDataProvider() { - return new Object[][] { - {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"}, - {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"}, - {tableFeedWithUniformStorage, Storage.TYPE.TABLE, UNIFORM_TABLE}, - {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDETBL}, - }; - } - - @Test (dataProvider = "uniformFeedStorageDataProvider") - public void testCreateStorageWithFeed(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - Storage storage = FeedHelper.createStorage(feed); - Assert.assertEquals(storage.getType(), storageType); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - - if (storageType == Storage.TYPE.TABLE) { - Assert.assertEquals(((CatalogStorage) storage).getDatabase(), "default"); - } - } - - @DataProvider (name = "overriddenFeedStorageDataProvider") - private Object[][] createFeedStorageDataProvider() { - return new Object[][] { - {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"}, - {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"}, - {tableFeedWithUniformStorage, Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"}, - {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDE_TBL_LOC}, - }; - } - - @Test (dataProvider = "overriddenFeedStorageDataProvider") - public void testCreateStorageWithFeedAndClusterEntity(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - Storage storage = FeedHelper.createStorage(clusterEntity, feed); - Assert.assertEquals(storage.getType(), storageType); - - if (storageType == Storage.TYPE.FILESYSTEM) { - dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - dataLocation = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - } - - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @Test (dataProvider = "overriddenFeedStorageDataProvider") - public void testCreateStorageWithFeedAndClusterName(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - Storage storage = FeedHelper.createStorage(clusterEntity.getName(), feed); - Assert.assertEquals(storage.getType(), storageType); - - if (storageType == Storage.TYPE.FILESYSTEM) { - dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - dataLocation = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - } - - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @Test (dataProvider = "overriddenFeedStorageDataProvider") - public void testCreateStorageWithFeedAndFeedCluster(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - FeedHelper.getCluster(feed, clusterEntity.getName()); - Storage storage = FeedHelper.createStorage(feedCluster, feed); - Assert.assertEquals(storage.getType(), storageType); - - if (storageType == Storage.TYPE.FILESYSTEM) { - dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - dataLocation = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - } - - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @Test (dataProvider = "overriddenFeedStorageDataProvider") - public void testCreateStorageWithAll(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - FeedHelper.getCluster(feed, clusterEntity.getName()); - Storage storage = FeedHelper.createStorage(feedCluster, feed, clusterEntity); - Assert.assertEquals(storage.getType(), storageType); - - if (storageType == Storage.TYPE.FILESYSTEM) { - dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - dataLocation = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - } - - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @Test (dataProvider = "overriddenFeedStorageDataProvider") - public void testCreateReadOnlyStorage(Feed feed, Storage.TYPE storageType, - String dataLocation) throws Exception { - Storage readOnlyStorage = FeedHelper.createReadOnlyStorage(clusterEntity, feed); - Assert.assertEquals(readOnlyStorage.getType(), storageType); - - if (storageType == Storage.TYPE.FILESYSTEM) { - dataLocation = ClusterHelper.getReadOnlyStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - dataLocation = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - } - - Assert.assertEquals(readOnlyStorage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @DataProvider (name = "uriTemplateDataProvider") - private Object[][] createUriTemplateDataProvider() { - return new Object[][] { - {Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"}, - {Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"}, - {Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"}, - {Storage.TYPE.TABLE, OVERRIDE_TBL_LOC}, - }; - } - - @Test (dataProvider = "uriTemplateDataProvider") - public void testCreateStorageWithUriTemplate(Storage.TYPE storageType, - String dataLocation) throws Exception { - String uriTemplate = null; - if (storageType == Storage.TYPE.FILESYSTEM) { - uriTemplate = "DATA=" + ClusterHelper.getStorageUrl(clusterEntity) + dataLocation + "#"; - dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation; - } else if (storageType == Storage.TYPE.TABLE) { - uriTemplate = - ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation; - dataLocation = uriTemplate; - } - - Storage storage = FeedHelper.createStorage(storageType.name(), uriTemplate); - Assert.assertEquals(storage.getType(), storageType); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation); - } - - @DataProvider (name = "storageTypeDataProvider") - private Object[][] createStorageTypeDataProvider() { - return new Object[][] { - {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM}, - {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM}, - {tableFeedWithUniformStorage, Storage.TYPE.TABLE}, - {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE}, - }; - } - - @Test (dataProvider = "storageTypeDataProvider") - public void testGetStorageTypeWithFeed(Feed feed, Storage.TYPE expectedStorageType) throws Exception { - Storage.TYPE actualStorageType = FeedHelper.getStorageType(feed); - Assert.assertEquals(actualStorageType, expectedStorageType); - - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - FeedHelper.getCluster(feed, clusterEntity.getName()); - actualStorageType = FeedHelper.getStorageType(feed, feedCluster); - Assert.assertEquals(actualStorageType, expectedStorageType); - - actualStorageType = FeedHelper.getStorageType(feed, clusterEntity); - Assert.assertEquals(actualStorageType, expectedStorageType); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java b/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java deleted file mode 100644 index 5b1af78..0000000 --- a/common/src/test/java/org/apache/falcon/entity/TestWorkflowNameBuilder.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.entity; - -import org.apache.falcon.Pair; -import org.apache.falcon.Tag; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.Arrays; - -/** - * Test for workflow name builder. - */ -public class TestWorkflowNameBuilder { - - @Test - public void getTagTest() { - Feed feed = new Feed(); - feed.setName("raw-logs"); - - WorkflowNameBuilder<Feed> builder = new WorkflowNameBuilder<Feed>(feed); - Tag tag = builder.getWorkflowTag("FALCON_FEED_RETENTION_raw-logs"); - Assert.assertEquals(tag, Tag.RETENTION); - - tag = builder.getWorkflowTag("FALCON_FEED_raw-logs"); - Assert.assertNull(tag); - - tag = builder.getWorkflowTag("FALCON_FEED_REPLICATION_raw-logs_corp1"); - Assert.assertEquals(tag, Tag.REPLICATION); - - } - - @Test - public void getSuffixesTest() { - Feed feed = new Feed(); - feed.setName("raw-logs"); - WorkflowNameBuilder<Feed> builder = new WorkflowNameBuilder<Feed>(feed); - - String suffixes = builder - .getWorkflowSuffixes("FALCON_FEED_REPLICATION_raw-logs_corp-1"); - Assert.assertEquals(suffixes, "_corp-1"); - - suffixes = builder - .getWorkflowSuffixes("FALCON_FEED_REPLICATION_raw-logs"); - Assert.assertEquals(suffixes, ""); - } - - @Test - public void workflowNameTest() { - Feed feed = new Feed(); - feed.setName("raw-logs"); - - WorkflowNameBuilder<Feed> builder = new WorkflowNameBuilder<Feed>(feed); - Assert.assertEquals(builder.getWorkflowName().toString(), - "FALCON_FEED_raw-logs"); - - builder.setTag(Tag.REPLICATION); - Assert.assertEquals(builder.getWorkflowName().toString(), - "FALCON_FEED_REPLICATION_raw-logs"); - - builder.setSuffixes(Arrays.asList("cluster1")); - Assert.assertEquals(builder.getWorkflowName().toString(), - "FALCON_FEED_REPLICATION_raw-logs_cluster1"); - - Process process = new Process(); - process.setName("agg-logs"); - WorkflowNameBuilder<Process> processBuilder = new WorkflowNameBuilder<Process>( - process); - processBuilder.setTag(Tag.DEFAULT); - Assert.assertEquals(processBuilder.getWorkflowName().toString(), - "FALCON_PROCESS_DEFAULT_agg-logs"); - - } - - @Test(dataProvider = "workflowNames") - public void workflowNameTypeTest(String wfName, Pair<String, EntityType> nameType) { - Assert.assertEquals(WorkflowNameBuilder.WorkflowName.getEntityNameAndType(wfName), nameType); - } - - @DataProvider(name = "workflowNames") - public Object[][] getWorkflowNames() { - return new Object[][] { - {"FALCON_PROCESS_DEFAULT_agg-logs", new Pair<>("agg-logs", EntityType.PROCESS)}, - {"FALCON_FEED_REPLICATION_raw-logs", new Pair<>("raw-logs", EntityType.FEED)}, - {"FALCON_FEED_RETENTION_logs2", new Pair<>("logs2", EntityType.FEED)}, - {"FALCON_FEED_REPLICATION_logs_colo1", new Pair<>("logs", EntityType.FEED)}, - }; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java b/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java deleted file mode 100644 index d4cf82c..0000000 --- a/common/src/test/java/org/apache/falcon/entity/lock/MemoryLocksTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.lock; - -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.process.Process; -import org.testng.Assert; -import org.testng.annotations.Test; - -/** - * Test for Memory Locking mechanism used for schedule/update of entities. - */ - -public class MemoryLocksTest { - private static final String FEED_XML = "/config/feed/feed-0.1.xml"; - private static final String PROCESS_XML = "/config/process/process-0.1.xml"; - - @Test - public void testSuccessfulMemoryLockAcquisition() throws Exception { - MemoryLocks memoryLocks = MemoryLocks.getInstance(); - Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); - Assert.assertEquals(memoryLocks.acquireLock(feed, "test"), true); - memoryLocks.releaseLock(feed); - } - - @Test - public void testUnsuccessfulMemoryLockAcquisition() throws Exception { - MemoryLocks memoryLocks = MemoryLocks.getInstance(); - Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); - Assert.assertEquals(memoryLocks.acquireLock(feed, "test"), true); - Assert.assertEquals(memoryLocks.acquireLock(feed, "test"), false); - memoryLocks.releaseLock(feed); - } - - @Test - public void testDuplicateEntityNameLockAcquisition() throws Exception { - MemoryLocks memoryLocks = MemoryLocks.getInstance(); - //In case both feed & process have identical names, they shouldn't clash during updates - Entity feed = (Entity) EntityType.FEED.getUnmarshaller().unmarshal(this.getClass().getResource(FEED_XML)); - org.apache.falcon.entity.v0.process.Process process = (Process) EntityType.PROCESS.getUnmarshaller(). - unmarshal(this.getClass().getResource(PROCESS_XML)); - process.setName(feed.getName()); - Assert.assertEquals(memoryLocks.acquireLock(feed, "test"), true); - Assert.assertEquals(memoryLocks.acquireLock(process, "test"), true); - memoryLocks.releaseLock(feed); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java deleted file mode 100644 index f98b6e4..0000000 --- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java +++ /dev/null @@ -1,459 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.catalog.CatalogServiceFactory; -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.ClusterHelper; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.ClusterLocationType; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.cluster.Location; -import org.apache.falcon.entity.v0.cluster.Locations; -import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; - - -/** - * Test for validating cluster entity parsing. - */ -public class ClusterEntityParserTest extends AbstractTestBase { - - private final ClusterEntityParser parser = (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); - private static final String CLUSTER_LOCATIONS_BASE_DIR = "/projects/falcon/ClusterEntityParserTestLocations/"; - - @Test - public void testParse() throws IOException, FalconException, JAXBException { - - InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); - - Cluster cluster = parser.parse(stream); - ClusterHelper.getInterface(cluster, Interfacetype.WRITE) - .setEndpoint(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)); - Locations locations = getClusterLocations("staging0", "working0"); - cluster.setLocations(locations); - - Assert.assertNotNull(cluster); - Assert.assertEquals(cluster.getName(), "testCluster"); - - Interface execute = ClusterHelper.getInterface(cluster, Interfacetype.EXECUTE); - - Assert.assertEquals(execute.getEndpoint(), "localhost:8021"); - Assert.assertEquals(execute.getVersion(), "0.20.2"); - - Interface readonly = ClusterHelper.getInterface(cluster, Interfacetype.READONLY); - Assert.assertEquals(readonly.getEndpoint(), "hftp://localhost:50010"); - Assert.assertEquals(readonly.getVersion(), "0.20.2"); - - Interface write = ClusterHelper.getInterface(cluster, Interfacetype.WRITE); - //assertEquals(write.getEndpoint(), conf.get("fs.defaultFS")); - Assert.assertEquals(write.getVersion(), "0.20.2"); - - Interface workflow = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW); - Assert.assertEquals(workflow.getEndpoint(), "http://localhost:11000/oozie/"); - Assert.assertEquals(workflow.getVersion(), "4.0"); - - Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath(), - CLUSTER_LOCATIONS_BASE_DIR + "staging0"); - Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), - CLUSTER_LOCATIONS_BASE_DIR + "working0"); - - StringWriter stringWriter = new StringWriter(); - Marshaller marshaller = EntityType.CLUSTER.getMarshaller(); - marshaller.marshal(cluster, stringWriter); - System.out.println(stringWriter.toString()); - - Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); - Assert.assertEquals(catalog.getEndpoint(), "http://localhost:48080/templeton/v1"); - Assert.assertEquals(catalog.getVersion(), "0.11.0"); - - - } - - @Test - public void testParseClusterWithoutRegistry() throws IOException, FalconException, JAXBException { - - StartupProperties.get().setProperty(CatalogServiceFactory.CATALOG_SERVICE, "thrift://localhost:9083"); - Assert.assertTrue(CatalogServiceFactory.isEnabled()); - - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml"); - Cluster cluster = parser.parse(stream); - - Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); - Assert.assertNull(catalog); - - StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE); - Assert.assertFalse(CatalogServiceFactory.isEnabled()); - - catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); - Assert.assertNull(catalog); - } - - @Test - public void testParseClusterWithoutMessaging() throws FalconException { - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-messaging.xml"); - - // Parse should be successful - Cluster cluster = parser.parse(stream); - - Interface messaging = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING); - Assert.assertNull(messaging); - - Assert.assertEquals(ClusterHelper.getMessageBrokerUrl(cluster), ClusterHelper.NO_USER_BROKER_URL); - } - - @Test(expectedExceptions = ValidationException.class, - expectedExceptionsMessageRegExp = ".*java.net.UnknownHostException.*") - public void testParseClusterWithBadWriteInterface() throws Exception { - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-bad-write-endpoint.xml"); - Cluster cluster = parser.parse(stream); - parser.validate(cluster); - } - - @Test - public void testParseClusterWithBadRegistry() throws Exception { - // disable catalog service - StartupProperties.get().remove(CatalogServiceFactory.CATALOG_SERVICE); - Assert.assertFalse(CatalogServiceFactory.isEnabled()); - - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-bad-registry.xml"); - Cluster cluster = parser.parse(stream); - - Interface catalog = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY); - Assert.assertEquals(catalog.getEndpoint(), "Hcat"); - Assert.assertEquals(catalog.getVersion(), "0.1"); - } - - @Test - public void testValidateClusterProperties() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-0.1.xml"); - Cluster cluster = parser.parse(stream); - - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateLocations(cluster); - - // Good set of properties, should work - clusterEntityParser.validateProperties(cluster); - - // add duplicate property, should throw validation exception. - Property property1 = new Property(); - property1.setName("field1"); - property1.setValue("any value"); - cluster.getProperties().getProperties().add(property1); - try { - clusterEntityParser.validate(cluster); - Assert.fail(); // should not reach here - } catch (ValidationException e) { - // Do nothing - } - - // Remove duplicate property. It should not throw exception anymore - cluster.getProperties().getProperties().remove(property1); - clusterEntityParser.validateProperties(cluster); - - // add empty property name, should throw validation exception. - property1.setName(""); - cluster.getProperties().getProperties().add(property1); - try { - clusterEntityParser.validateProperties(cluster); - Assert.fail(); // should not reach here - } catch (ValidationException e) { - // Do nothing - } - - } - - /** - * A positive test for validating tags key value pair regex: key=value, key=value. - * @throws FalconException - */ - @Test - public void testClusterTags() throws FalconException { - InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); - Cluster cluster = parser.parse(stream); - - final String tags = cluster.getTags(); - Assert.assertEquals("[email protected], [email protected], _department_type=forecasting", tags); - - final String[] keys = {"consumer", "owner", "_department_type", }; - final String[] values = {"[email protected]", "[email protected]", "forecasting", }; - - final String[] pairs = tags.split(","); - Assert.assertEquals(3, pairs.length); - for (int i = 0; i < pairs.length; i++) { - String pair = pairs[i].trim(); - String[] parts = pair.split("="); - Assert.assertEquals(keys[i], parts[0]); - Assert.assertEquals(values[i], parts[1]); - } - } - - @Test - public void testValidateACLWithNoACLAndAuthorizationEnabled() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - - try { - InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML); - - // need a new parser since it caches authorization enabled flag - ClusterEntityParser clusterEntityParser = - (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); - Cluster cluster = clusterEntityParser.parse(stream); - Assert.assertNotNull(cluster); - Assert.assertNull(cluster.getACL()); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - @Test - public void testValidateACLAuthorizationEnabled() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - - try { - InputStream stream = this.getClass().getResourceAsStream("/config/cluster/cluster-no-registry.xml"); - - // need a new parser since it caches authorization enabled flag - ClusterEntityParser clusterEntityParser = - (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER); - Cluster cluster = clusterEntityParser.parse(stream); - Assert.assertNotNull(cluster); - Assert.assertNotNull(cluster.getACL()); - Assert.assertNotNull(cluster.getACL().getOwner()); - Assert.assertNotNull(cluster.getACL().getGroup()); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - /** - * A lightweight unit test for a cluster where location type staging is missing. - * Extensive tests are found in ClusterEntityValidationIT. - * - * @throws ValidationException - */ - @Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = ".*Unable to find.*") - public void testClusterWithoutStaging() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - Locations locations = getClusterLocations(null, "non/existent/path"); - cluster.setLocations(locations); - clusterEntityParser.validate(cluster); - Assert.fail("Should have thrown a validation exception"); - } - - /** - * A lightweight unit test for a cluster where location paths are invalid. - * Extensive tests are found in ClusterEntityValidationIT. - * - * @throws ValidationException - */ - @Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = ".*Location.*must exist.") - public void testClusterWithInvalidLocationsPaths() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster)this.dfsCluster.getCluster().copy(); - Locations locations = getClusterLocations("non/existent/path", null); - cluster.setLocations(locations); - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - clusterEntityParser.validate(cluster); - Assert.fail("Should have thrown a validation exception"); - } - - /** - * A lightweight unit test for a cluster where location paths are same. - * Extensive tests are found in ClusterEntityValidationIT. - * - * @throws ValidationException - */ - @Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = ".*same path:.*") - public void testClusterWithSameWorkingAndStaging() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); - Locations locations = getClusterLocations("staging1", "staging1"); - cluster.setLocations(locations); - this.dfsCluster.getFileSystem().mkdirs(new Path(cluster.getLocations().getLocations().get(0).getPath()), - HadoopClientFactory.ALL_PERMISSION); - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - clusterEntityParser.validate(cluster); - Assert.fail("Should have thrown a validation exception"); - } - - /** - * A lightweight unit test for a cluster where location type working is missing. - * It should automatically get generated - * Extensive tests are found in ClusterEntityValidationIT. - */ - @Test - public void testClusterWithOnlyStaging() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); - Locations locations = getClusterLocations("staging2", null); - cluster.setLocations(locations); - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - this.dfsCluster.getFileSystem().mkdirs(new Path(ClusterHelper.getLocation(cluster, - ClusterLocationType.STAGING).getPath()), HadoopClientFactory.ALL_PERMISSION); - clusterEntityParser.validate(cluster); - String workingDirPath = cluster.getLocations().getLocations().get(0).getPath() + "/working"; - Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), workingDirPath); - FileStatus workingDirStatus = this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(workingDirPath)); - Assert.assertTrue(workingDirStatus.isDirectory()); - Assert.assertEquals(workingDirStatus.getPermission(), HadoopClientFactory.READ_EXECUTE_PERMISSION); - Assert.assertEquals(workingDirStatus.getOwner(), UserGroupInformation.getLoginUser().getShortUserName()); - - String stagingSubdirFeed = cluster.getLocations().getLocations().get(0).getPath() + "/falcon/workflows/feed"; - String stagingSubdirProcess = - cluster.getLocations().getLocations().get(0).getPath() + "/falcon/workflows/process"; - FileStatus stagingSubdirFeedStatus = - this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(stagingSubdirFeed)); - FileStatus stagingSubdirProcessStatus = - this.dfsCluster.getFileSystem().getFileLinkStatus(new Path(stagingSubdirProcess)); - Assert.assertTrue(stagingSubdirFeedStatus.isDirectory()); - Assert.assertEquals(stagingSubdirFeedStatus.getPermission(), HadoopClientFactory.ALL_PERMISSION); - Assert.assertTrue(stagingSubdirProcessStatus.isDirectory()); - Assert.assertEquals(stagingSubdirProcessStatus.getPermission(), HadoopClientFactory.ALL_PERMISSION); - } - - /** - * A lightweight unit test for a cluster where location working is not there and staging - * has a subdir which will be used by cluster as working. - * Checking for wrong perms of this subdir - * Extensive tests are found in ClusterEntityValidationIT. - * - * @throws ValidationException - */ - @Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = ".*rwxr-xr-x.*rwxrwxrwx") - public void testClusterWithSubdirInStaging() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); - Locations locations = getClusterLocations("staging3", null); - cluster.setLocations(locations); - - HadoopClientFactory.mkdirs(this.dfsCluster.getFileSystem(), - new Path(cluster.getLocations().getLocations().get(0).getPath()), - HadoopClientFactory.ALL_PERMISSION); - HadoopClientFactory.mkdirs(this.dfsCluster.getFileSystem(), - new Path(cluster.getLocations().getLocations().get(0).getPath() + "/working"), - HadoopClientFactory.ALL_PERMISSION); - - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - clusterEntityParser.validate(cluster); - Assert.fail("Should have thrown a validation exception"); - } - - /** - * A lightweight unit test for a cluster where staging location - * does not have ALL_PERMISSION (777). - * Staging has permission less than ALL_PERMISSION - * ValidationException should be thrown - * - * @throws ValidationException - */ - @Test(expectedExceptions = ValidationException.class, expectedExceptionsMessageRegExp = ".*rwxr-xr-x.*rwxrwxrwx") - public void testClusterWithStagingPermission() throws Exception { - ClusterEntityParser clusterEntityParser = Mockito - .spy((ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER)); - Cluster cluster = (Cluster) this.dfsCluster.getCluster().copy(); - Locations locations = getClusterLocations("staging4", null); - cluster.setLocations(locations); - Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster); - Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster); - this.dfsCluster.getFileSystem().mkdirs(new Path(ClusterHelper.getLocation(cluster, - ClusterLocationType.STAGING).getPath()), HadoopClientFactory.READ_EXECUTE_PERMISSION); - clusterEntityParser.validate(cluster); - Assert.fail("Should have thrown a validation exception"); - } - - @BeforeClass - public void init() throws Exception { - this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); - this.conf = dfsCluster.getConf(); - this.dfsCluster.getFileSystem().mkdirs(new Path(CLUSTER_LOCATIONS_BASE_DIR)); - } - - @AfterClass - public void tearDown() throws IOException { - this.dfsCluster.getFileSystem().delete(new Path(CLUSTER_LOCATIONS_BASE_DIR), true); - this.dfsCluster.shutdown(); - } - - private Locations getClusterLocations(String staging, String working) { - Locations locations = new Locations(); - - Location loc = new Location(); - loc.setName(ClusterLocationType.STAGING); - if (StringUtils.isNotEmpty(staging)) { - loc.setPath(CLUSTER_LOCATIONS_BASE_DIR + staging); - locations.getLocations().add(loc); - } - - loc = new Location(); - loc.setName(ClusterLocationType.WORKING); - if (StringUtils.isNotEmpty(working)) { - loc.setPath(CLUSTER_LOCATIONS_BASE_DIR + working); - locations.getLocations().add(loc); - } - - return locations; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java deleted file mode 100644 index 3893917..0000000 --- a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.datasource.Datasource; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.InputStream; - -/** - * Test class for Datasource Entity. - */ -public class DatasourceEntityParserTest extends AbstractTestBase { - - private EmbeddedCluster cluster; - private String hdfsUrl; - - private final DatasourceEntityParser datasourceEntityParser = - (DatasourceEntityParser) EntityParserFactory.getParser(EntityType.DATASOURCE); - private final FeedEntityParser feedEntityParser = - (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED); - - @BeforeClass - public void start() throws Exception { - cluster = EmbeddedCluster.newCluster("test"); - hdfsUrl = cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY); - } - - @AfterClass - public void close() throws Exception { - cluster.shutdown(); - } - - @BeforeMethod - public void setup() throws Exception { - cleanupStore(); - } - - @Test - public void testDatasourceEntity() throws Exception { - - InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-0.1.xml"); - Datasource datasource = datasourceEntityParser.parse(stream); - - ConfigurationStore store = ConfigurationStore.get(); - store.publish(EntityType.DATASOURCE, datasource); - - Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName()); - Assert.assertEquals("test-hsql-db", datasource.getName()); - Assert.assertEquals("test-hsql-db", databaseEntity.getName()); - Assert.assertEquals("hsql", databaseEntity.getType().value()); - Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz()); - } - - @Test - public void testDatasourcePasswordFileEntity() throws Exception { - - InputStream stream = this.getClass().getResourceAsStream("/config/datasource/datasource-file-0.1.xml"); - Datasource datasource = datasourceEntityParser.parse(stream); - ConfigurationStore store = ConfigurationStore.get(); - store.publish(EntityType.DATASOURCE, datasource); - - Datasource databaseEntity = EntityUtil.getEntity(EntityType.DATASOURCE, datasource.getName()); - Assert.assertEquals("test-hsql-db", datasource.getName()); - Assert.assertEquals("test-hsql-db", databaseEntity.getName()); - Assert.assertEquals("hsql", databaseEntity.getType().value()); - Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz()); - } -}
