http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java deleted file mode 100644 index 450b251..0000000 --- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java +++ /dev/null @@ -1,1080 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.parser.EntityParserFactory; -import org.apache.falcon.entity.parser.FeedEntityParser; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Properties; -import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.entity.v0.feed.Argument; -import org.apache.falcon.entity.v0.feed.Arguments; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.Extract; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; -import org.apache.falcon.entity.v0.feed.FieldsType; -import org.apache.falcon.entity.v0.feed.Import; -import org.apache.falcon.entity.v0.feed.Lifecycle; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.feed.RetentionStage; -import org.apache.falcon.entity.v0.feed.Datasource; -import org.apache.falcon.entity.v0.feed.Validity; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.resource.SchedulableEntityInstance; -import org.apache.falcon.service.LifecyclePolicyMap; -import org.apache.falcon.util.DateUtil; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TimeZone; - -/** - * Test for feed helper methods. - */ -public class FeedHelperTest extends AbstractTestBase { - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - private ConfigurationStore store; - - @BeforeClass - public void init() throws Exception { - initConfigStore(); - LifecyclePolicyMap.get().init(); - } - - @BeforeMethod - public void setUp() throws Exception { - cleanupStore(); - store = getStore(); - } - - @Test - public void testPartitionExpression() { - Assert.assertEquals(FeedHelper.normalizePartitionExpression(" /a// ", " /b// "), "a/b"); - Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, " /b// "), "b"); - Assert.assertEquals(FeedHelper.normalizePartitionExpression(null, null), ""); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testInstanceBeforeStart() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - FeedHelper.getProducerInstance(feed, getDate("2011-02-27 10:00 UTC"), cluster); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testInstanceEqualsEnd() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - FeedHelper.getProducerInstance(feed, getDate("2016-02-28 10:00 UTC"), cluster); - } - - @Test(expectedExceptions = IllegalArgumentException.class) - public void testInstanceOutOfSync() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - FeedHelper.getProducerInstance(feed, getDate("2016-02-28 09:04 UTC"), cluster); - } - - @Test - public void testInvalidProducerInstance() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - Assert.assertNull(FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:40 UTC"), cluster)); - } - - @Test - public void testGetProducerOutOfValidity() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"), - cluster); - Assert.assertNull(result); - } - - @Test - public void testGetConsumersOutOfValidity() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, -20)"); - inFeed.setEnd("now(0, 0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"), - cluster); - Assert.assertTrue(result.isEmpty()); - } - - @Test - public void testGetFeedValidityStartAndNextInstance() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Date date = FeedHelper.getFeedValidityStart(feed, cluster.getName()); - Assert.assertEquals(DateUtil.getDateFormatFromTime(date.getTime()), "2011-02-28T10:00Z"); - Date nextDate = FeedHelper.getNextFeedInstanceDate(date, feed); - Assert.assertEquals(DateUtil.getDateFormatFromTime(nextDate.getTime()), "2011-02-28T10:05Z"); - } - - - @Test - public void testGetConsumersFirstInstance() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2012-02-28 10:47 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, -20)"); - inFeed.setEnd("now(0, 0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"), - cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2012-02-28 10:37 UTC"), EntityType.PROCESS); - consumer.setTags(SchedulableEntityInstance.INPUT); - expected.add(consumer); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumersLastInstance() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:20 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, -20)"); - inFeed.setEnd("now(0, 0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2012-02-28 10:15 UTC"), - cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = { "2012-02-28 10:20 UTC", "2012-02-28 10:30 UTC", }; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testGetPolicies() throws Exception { - FeedEntityParser parser = (FeedEntityParser) EntityParserFactory - .getParser(EntityType.FEED); - Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML)); - List<String> policies = FeedHelper.getPolicies(feed, "testCluster"); - Assert.assertEquals(policies.size(), 1); - Assert.assertEquals(policies.get(0), "AgeBasedDelete"); - } - - @Test - public void testFeedWithNoDependencies() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, getDate("2016-02-28 09:00 UTC"), - cluster); - Assert.assertTrue(result.isEmpty()); - SchedulableEntityInstance res = FeedHelper.getProducerInstance(feed, getDate("2012-02-28 10:45 UTC"), - cluster); - Assert.assertNull(res); - } - - @Test - public void testEvaluateExpression() throws Exception { - Cluster cluster = new Cluster(); - cluster.setName("name"); - cluster.setColo("colo"); - cluster.setProperties(new Properties()); - Property prop = new Property(); - prop.setName("pname"); - prop.setValue("pvalue"); - cluster.getProperties().getProperties().add(prop); - - Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.colo}/*/US"), "colo/*/US"); - Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "${cluster.name}/*/${cluster.pname}"), - "name/*/pvalue"); - Assert.assertEquals(FeedHelper.evaluateClusterExp(cluster, "IN"), "IN"); - } - - @DataProvider(name = "fsPathsforDate") - public Object[][] createPathsForGetDate() { - final TimeZone utc = TimeZone.getTimeZone("UTC"); - final TimeZone pacificTime = TimeZone.getTimeZone("America/Los_Angeles"); - final TimeZone ist = TimeZone.getTimeZone("IST"); - - return new Object[][] { - {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015/01/01/00/30", utc, "2015-01-01T00:30Z"}, - {"/data/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}", "/data/2015-01-01-01-00", utc, "2015-01-01T01:00Z"}, - {"/data/${YEAR}/${MONTH}/${DAY}", "/data/2015/01/01", utc, "2015-01-01T00:00Z"}, - {"/data/${YEAR}/${MONTH}/${DAY}/data", "/data/2015/01/01/data", utc, "2015-01-01T00:00Z"}, - {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}", "/data/2015-01-01/00/30", utc, null}, - {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015-01-01/00/30", utc, null}, - {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/data", "/d/2015/05/25/00/data/{p1}/p2", utc, "2015-05-25T00:00Z"}, - {"/data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data", "/data/2015/05/25/00/00/{p1}/p2", utc, null}, - {"/d/${YEAR}/${MONTH}/M", "/d/2015/11/M", utc, "2015-11-01T00:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/M", "/d/2015/11/02/M", utc, "2015-11-02T00:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/M", "/d/2015/11/01/04/M", utc, "2015-11-01T04:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/M", "/d/2015/11/01/04/15/M", utc, "2015-11-01T04:15Z"}, - {"/d/${YEAR}/${MONTH}/M", "/d/2015/11/M", pacificTime, "2015-11-01T07:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/M", "/d/2015/11/02/M", pacificTime, "2015-11-02T08:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/M", "/d/2015/11/01/04/M", pacificTime, "2015-11-01T12:00Z"}, - {"/d/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/M", "/d/2015/11/01/04/15/M", ist, "2015-10-31T22:45Z"}, - }; - } - - @Test(dataProvider = "fsPathsforDate") - public void testGetDateFromPath(String template, String path, TimeZone tz, String expectedDate) throws Exception { - Date date = FeedHelper.getDate(template, new Path(path), tz); - Assert.assertEquals(SchemaHelper.formatDateUTC(date), expectedDate); - } - - @Test - public void testGetLocations() { - Cluster cluster = new Cluster(); - cluster.setName("name"); - Feed feed = new Feed(); - Location location1 = new Location(); - location1.setType(LocationType.META); - Locations locations = new Locations(); - locations.getLocations().add(location1); - - Location location2 = new Location(); - location2.setType(LocationType.DATA); - locations.getLocations().add(location2); - - org.apache.falcon.entity.v0.feed.Cluster feedCluster = new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName("name"); - - feed.setLocations(locations); - Clusters clusters = new Clusters(); - feed.setClusters(clusters); - feed.getClusters().getClusters().add(feedCluster); - - Assert.assertEquals(FeedHelper.getLocations(feedCluster, feed), - locations.getLocations()); - Assert.assertEquals(FeedHelper.getLocation(feed, cluster, LocationType.DATA), location2); - } - - @Test - public void testGetProducerProcessWithOffset() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Assert.assertNull(FeedHelper.getProducerProcess(feed)); - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 10:37 UTC", "2016-02-28 10:37 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:35 UTC"), - cluster); - SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:37 UTC"), EntityType.PROCESS); - expected.setTags(SchedulableEntityInstance.OUTPUT); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetProducerProcessForNow() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Assert.assertNull(FeedHelper.getProducerProcess(feed)); - - // create it's producer process submit it, test it's ProducerProcess - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"), - cluster); - SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTags(SchedulableEntityInstance.OUTPUT); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetProducerWithNowNegativeOffset() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Assert.assertNull(FeedHelper.getProducerProcess(feed)); - - // create it's producer process submit it, test it's ProducerProcess - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(-4,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-27 10:00 UTC"), - cluster); - SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTags(SchedulableEntityInstance.OUTPUT); - Assert.assertEquals(result, expected); - } - - - @Test - public void testGetProducerWithNowPositiveOffset() throws FalconException, ParseException { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Assert.assertNull(FeedHelper.getProducerProcess(feed)); - - // create it's producer process submit it, test it's ProducerProcess - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("now(4,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 10:00 UTC"), - cluster); - SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTags(SchedulableEntityInstance.OUTPUT); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetProducerProcessInstance() throws FalconException, ParseException { - //create a feed, submit it, test that ProducerProcess is null - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2011-02-28 00:00 UTC", "2016-02-28 10:00 UTC"); - - // create it's producer process submit it, test it's ProducerProcess - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Outputs outputs = new Outputs(); - Output outFeed = new Output(); - outFeed.setName("outputFeed"); - outFeed.setFeed(feed.getName()); - outFeed.setInstance("today(0,0)"); - outputs.getOutputs().add(outFeed); - process.setOutputs(outputs); - store.publish(EntityType.PROCESS, process); - Assert.assertEquals(FeedHelper.getProducerProcess(feed).getName(), process.getName()); - SchedulableEntityInstance result = FeedHelper.getProducerInstance(feed, getDate("2013-02-28 00:00 UTC"), - cluster); - SchedulableEntityInstance expected = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2013-02-28 10:00 UTC"), EntityType.PROCESS); - expected.setTags(SchedulableEntityInstance.OUTPUT); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumerProcesses() throws FalconException, ParseException { - //create a feed, submit it, test that ConsumerProcesses is blank list - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("outputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("today(0,0)"); - inFeed.setEnd("today(0,0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<Process> result = FeedHelper.getConsumerProcesses(feed); - Assert.assertEquals(result.size(), 1); - Assert.assertTrue(result.contains(process)); - } - - @Test - public void testGetConsumerProcessInstances() throws Exception { - //create a feed, submit it, test that ConsumerProcesses is blank list - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "days(1)", "2012-02-28 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(-4, 30)"); - inFeed.setEnd("now(4, 30)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 09:00 UTC"), cluster); - Assert.assertEquals(result.size(), 1); - - Set<SchedulableEntityInstance> expected = new HashSet<>(); - SchedulableEntityInstance ins = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2012-02-28 10:00 UTC"), EntityType.PROCESS); - ins.setTags(SchedulableEntityInstance.INPUT); - expected.add(ins); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumerProcessInstancesWithNonUnitFrequency() throws Exception { - //create a feed, submit it, test that ConsumerProcesses is blank list - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2012-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, -20)"); - inFeed.setEnd("now(0,0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 09:40 UTC"), cluster); - - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = {"2012-02-28 09:47 UTC", "2012-02-28 09:57 UTC"}; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumersOutOfValidityRange() throws Exception { - //create a feed, submit it, test that ConsumerProcesses is blank list - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, -20)"); - inFeed.setEnd("now(0,0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2010-02-28 09:40 UTC"), cluster); - Assert.assertEquals(result.size(), 0); - } - - @Test - public void testGetConsumersLargeOffsetShortValidity() throws Exception { - //create a feed, submit it, test that ConsumerProcesses is blank list - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "minutes(5)", "2010-02-28 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "minutes(10)", "2012-02-28 09:37 UTC", "2012-02-28 09:47 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("today(-2, 0)"); - inFeed.setEnd("now(0,0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 09:35 UTC"), cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - SchedulableEntityInstance consumer = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate("2012-02-28 09:37 UTC"), EntityType.PROCESS); - consumer.setTags(SchedulableEntityInstance.INPUT); - expected.add(consumer); - Assert.assertEquals(result, expected); - } - - @Test - public void testGetMultipleConsumerInstances() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); - Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(-4, 30)"); - inFeed.setEnd("now(4, 30)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 09:00 UTC"), cluster); - Assert.assertEquals(result.size(), 9); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = { "2012-02-28 05:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 07:00 UTC", - "2012-02-28 08:00 UTC", "2012-02-28 09:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 11:00 UTC", - "2012-02-28 12:00 UTC", "2012-02-28 13:00 UTC", }; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumerWithVariableEnd() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("today(0, 0)"); - inFeed.setEnd("now(0, 0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 00:00 UTC"), cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = {"2012-02-28 11:00 UTC", "2012-02-28 16:00 UTC", "2012-02-28 18:00 UTC", - "2012-02-28 20:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 04:00 UTC", - "2012-02-28 06:00 UTC", "2012-02-28 05:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 00:00 UTC", - "2012-02-28 23:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 22:00 UTC", - "2012-02-28 14:00 UTC", "2012-02-28 08:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 02:00 UTC", - "2012-02-28 01:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 10:00 UTC", "2012-02-28 09:00 UTC", - "2012-02-28 07:00 UTC", }; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumerWithVariableStart() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); - - //create a consumer Process and submit it, assert that this is returned in ConsumerProcesses - Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("now(0, 0)"); - inFeed.setEnd("today(24, 0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-03-28 00:00 UTC"), cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = {"2012-03-27 16:00 UTC", "2012-03-27 01:00 UTC", "2012-03-27 10:00 UTC", - "2012-03-27 03:00 UTC", "2012-03-27 08:00 UTC", "2012-03-27 07:00 UTC", "2012-03-27 19:00 UTC", - "2012-03-27 22:00 UTC", "2012-03-27 12:00 UTC", "2012-03-27 20:00 UTC", "2012-03-27 09:00 UTC", - "2012-03-27 04:00 UTC", "2012-03-27 14:00 UTC", "2012-03-27 05:00 UTC", "2012-03-27 23:00 UTC", - "2012-03-27 17:00 UTC", "2012-03-27 13:00 UTC", "2012-03-27 18:00 UTC", "2012-03-27 15:00 UTC", - "2012-03-28 00:00 UTC", "2012-03-27 02:00 UTC", "2012-03-27 11:00 UTC", "2012-03-27 21:00 UTC", - "2012-03-27 00:00 UTC", "2012-03-27 06:00 UTC", }; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testGetConsumerWithLatest() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-27 00:00 UTC", "2016-02-28 00:00 UTC"); - Process process = prepareProcess(cluster, "hours(1)", "2012-02-27 10:00 UTC", "2016-02-28 10:00 UTC"); - Inputs inputs = new Inputs(); - Input inFeed = new Input(); - inFeed.setName("inputFeed"); - inFeed.setFeed(feed.getName()); - inFeed.setStart("today(0, 0)"); - inFeed.setEnd("latest(0)"); - inputs.getInputs().add(inFeed); - process.setInputs(inputs); - store.publish(EntityType.PROCESS, process); - - Set<SchedulableEntityInstance> result = FeedHelper.getConsumerInstances(feed, - getDate("2012-02-28 00:00 UTC"), cluster); - Set<SchedulableEntityInstance> expected = new HashSet<>(); - String[] consumers = {"2012-02-28 23:00 UTC", "2012-02-28 04:00 UTC", "2012-02-28 10:00 UTC", - "2012-02-28 07:00 UTC", "2012-02-28 17:00 UTC", "2012-02-28 13:00 UTC", "2012-02-28 05:00 UTC", - "2012-02-28 22:00 UTC", "2012-02-28 03:00 UTC", "2012-02-28 21:00 UTC", "2012-02-28 11:00 UTC", - "2012-02-28 20:00 UTC", "2012-02-28 06:00 UTC", "2012-02-28 01:00 UTC", "2012-02-28 14:00 UTC", - "2012-02-28 00:00 UTC", "2012-02-28 18:00 UTC", "2012-02-28 12:00 UTC", "2012-02-28 16:00 UTC", - "2012-02-28 09:00 UTC", "2012-02-28 15:00 UTC", "2012-02-28 19:00 UTC", "2012-02-28 08:00 UTC", - "2012-02-28 02:00 UTC", }; - for (String d : consumers) { - SchedulableEntityInstance i = new SchedulableEntityInstance(process.getName(), cluster.getName(), - getDate(d), EntityType.PROCESS); - i.setTags(SchedulableEntityInstance.INPUT); - expected.add(i); - } - Assert.assertEquals(result, expected); - } - - @Test - public void testIsLifeCycleEnabled() throws Exception { - Feed feed = new Feed(); - - // lifecycle is not defined - Clusters clusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); - cluster.setName("cluster1"); - clusters.getClusters().add(cluster); - feed.setClusters(clusters); - Assert.assertFalse(FeedHelper.isLifecycleEnabled(feed, cluster.getName())); - - // lifecycle is defined at global level - Lifecycle globalLifecycle = new Lifecycle(); - RetentionStage retentionStage = new RetentionStage(); - retentionStage.setFrequency(new Frequency("hours(2)")); - globalLifecycle.setRetentionStage(retentionStage); - feed.setLifecycle(globalLifecycle); - Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName())); - - // lifecycle is defined at both global and cluster level - Lifecycle clusterLifecycle = new Lifecycle(); - retentionStage = new RetentionStage(); - retentionStage.setFrequency(new Frequency("hours(4)")); - clusterLifecycle.setRetentionStage(retentionStage); - feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); - Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName())); - - // lifecycle is defined only at cluster level - feed.setLifecycle(null); - Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName())); - } - - @Test - public void testGetRetentionStage() throws Exception { - Feed feed = new Feed(); - feed.setFrequency(new Frequency("days(1)")); - - // retention stage frequency is not defined - Lifecycle globalLifecycle = new Lifecycle(); - RetentionStage globalRetentionStage = new RetentionStage(); - globalLifecycle.setRetentionStage(globalRetentionStage); - feed.setLifecycle(globalLifecycle); - - Clusters clusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); - cluster.setName("cluster1"); - clusters.getClusters().add(cluster); - feed.setClusters(clusters); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("days(1)")); - - // lifecycle is defined only at global level - globalRetentionStage.setFrequency(new Frequency("hours(2)")); - globalLifecycle.setRetentionStage(globalRetentionStage); - feed.setLifecycle(globalLifecycle); - Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - feed.getLifecycle().getRetentionStage().getFrequency()); - - // lifecycle is defined at both global and cluster level - Lifecycle clusterLifecycle = new Lifecycle(); - RetentionStage clusterRetentionStage = new RetentionStage(); - clusterRetentionStage.setFrequency(new Frequency("hours(4)")); - clusterLifecycle.setRetentionStage(clusterRetentionStage); - feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); - Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - cluster.getLifecycle().getRetentionStage().getFrequency()); - - // lifecycle at both level - retention only at cluster level. - feed.getLifecycle().setRetentionStage(null); - Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - cluster.getLifecycle().getRetentionStage().getFrequency()); - - // lifecycle at both level - retention only at global level. - feed.getLifecycle().setRetentionStage(globalRetentionStage); - feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null); - Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - feed.getLifecycle().getRetentionStage().getFrequency()); - - // lifecycle is defined only at cluster level - feed.setLifecycle(null); - feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage); - Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName())); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - cluster.getLifecycle().getRetentionStage().getFrequency()); - } - - @Test - public void testGetRetentionFrequency() throws Exception { - Feed feed = new Feed(); - feed.setFrequency(new Frequency("days(10)")); - - // no retention stage frequency defined - test both daily and monthly feeds - Lifecycle globalLifecycle = new Lifecycle(); - RetentionStage globalRetentionStage = new RetentionStage(); - globalLifecycle.setRetentionStage(globalRetentionStage); - feed.setLifecycle(globalLifecycle); - - Clusters clusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster(); - cluster.setName("cluster1"); - clusters.getClusters().add(cluster); - feed.setClusters(clusters); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("days(10)")); - - feed.setFrequency(new Frequency("hours(1)")); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("hours(6)")); - - feed.setFrequency(new Frequency("minutes(10)")); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("hours(6)")); - - feed.setFrequency(new Frequency("hours(7)")); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("hours(7)")); - - feed.setFrequency(new Frequency("days(2)")); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("days(2)")); - - // lifecycle at both level - retention only at global level. - feed.setFrequency(new Frequency("hours(1)")); - globalRetentionStage.setFrequency(new Frequency("hours(2)")); - globalLifecycle.setRetentionStage(globalRetentionStage); - feed.setLifecycle(globalLifecycle); - - Lifecycle clusterLifecycle = new Lifecycle(); - RetentionStage clusterRetentionStage = new RetentionStage(); - clusterLifecycle.setRetentionStage(clusterRetentionStage); - feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("hours(6)")); - - // lifecycle at both level - retention only at cluster level. - feed.getLifecycle().getRetentionStage().setFrequency(null); - clusterRetentionStage.setFrequency(new Frequency("hours(4)")); - Assert.assertEquals(FeedHelper.getLifecycleRetentionFrequency(feed, cluster.getName()), - new Frequency("hours(4)")); - } - - @Test - public void testFeedImportSnapshot() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster); - Assert.assertNotNull(feed.getClusters().getClusters()); - Assert.assertNotNull(feed.getClusters().getClusters().get(0)); - Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity()); - Assert.assertNotNull(feed.getClusters().getClusters().get(0).getValidity().getStart()); - Assert.assertNotNull(startInstResult); - Assert.assertNotNull(feedCluster.getValidity().getStart()); - Assert.assertEquals(getDate("2012-02-07 00:00 UTC"), feedCluster.getValidity().getStart()); - Assert.assertTrue(FeedHelper.isImportEnabled(feedCluster)); - Assert.assertEquals(MergeType.SNAPSHOT, FeedHelper.getImportMergeType(feedCluster)); - Assert.assertEquals(startInstResult, feedCluster.getValidity().getStart()); - } - - @Test - public void testFeedImportFields() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = importFeedSnapshot(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster); - List<String> fieldList = FeedHelper.getImportFieldList(feedCluster); - Assert.assertEquals(2, fieldList.size()); - Assert.assertFalse(FeedHelper.isFieldExcludes(feedCluster.getImport().getSource())); - } - - @Test - public void testFeedImportAppend() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = importFeedAppend(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - Date startInstResult = FeedHelper.getImportInitalInstance(feedCluster); - Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart()); - } - - public void testGetFeedClusterValidity() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); - Validity validity = FeedHelper.getClusterValidity(feed, cluster.getName()); - Assert.assertEquals(validity.getStart(), getDate("2012-02-07 00:00 UTC")); - Assert.assertEquals(validity.getEnd(), getDate("2020-02-25 00:00 UTC")); - } - - @Test(expectedExceptions = FalconException.class) - public void testGetClusterValidityInvalidCluster() throws Exception { - Cluster cluster = publishCluster(); - Feed feed = publishFeed(cluster, "hours(1)", "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC"); - FeedHelper.getClusterValidity(feed, "abracadabra"); - } - - private Validity getFeedValidity(String start, String end) throws ParseException { - Validity validity = new Validity(); - validity.setStart(getDate(start)); - validity.setEnd(getDate(end)); - return validity; - } - - private org.apache.falcon.entity.v0.process.Validity getProcessValidity(String start, String end) throws - ParseException { - - org.apache.falcon.entity.v0.process.Validity validity = new org.apache.falcon.entity.v0.process.Validity(); - validity.setStart(getDate(start)); - validity.setEnd(getDate(end)); - return validity; - } - - private Date getDate(String dateString) throws ParseException { - DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z"); - return format.parse(dateString); - } - - private Cluster publishCluster() throws FalconException { - Cluster cluster = new Cluster(); - cluster.setName("feedCluster"); - cluster.setColo("colo"); - store.publish(EntityType.CLUSTER, cluster); - return cluster; - - } - - private Feed publishFeed(Cluster cluster, String frequency, String start, String end) - throws FalconException, ParseException { - return publishFeed(cluster, frequency, start, end, null); - } - - private Feed publishFeed(Cluster cluster, String frequency, String start, String end, Import imp) - throws FalconException, ParseException { - - Feed feed = new Feed(); - feed.setName("feed"); - Frequency f = new Frequency(frequency); - feed.setFrequency(f); - feed.setTimezone(UTC); - Clusters fClusters = new Clusters(); - org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster(); - fCluster.setType(ClusterType.SOURCE); - fCluster.setImport(imp); - fCluster.setName(cluster.getName()); - fCluster.setValidity(getFeedValidity(start, end)); - fClusters.getClusters().add(fCluster); - feed.setClusters(fClusters); - store.publish(EntityType.FEED, feed); - - return feed; - } - - private Process prepareProcess(Cluster cluster, String frequency, String start, String end) throws ParseException { - Process process = new Process(); - process.setName("process"); - process.setTimezone(UTC); - org.apache.falcon.entity.v0.process.Clusters pClusters = new org.apache.falcon.entity.v0.process.Clusters(); - org.apache.falcon.entity.v0.process.Cluster pCluster = new org.apache.falcon.entity.v0.process.Cluster(); - pCluster.setName(cluster.getName()); - org.apache.falcon.entity.v0.process.Validity validity = getProcessValidity(start, end); - pCluster.setValidity(validity); - pClusters.getClusters().add(pCluster); - process.setClusters(pClusters); - Frequency f = new Frequency(frequency); - process.setFrequency(f); - return process; - } - - private Feed importFeedSnapshot(Cluster cluster, String frequency, String start, String end) - throws FalconException, ParseException { - - Import imp = getAnImport(MergeType.SNAPSHOT); - Feed feed = publishFeed(cluster, frequency, start, end, imp); - return feed; - } - - private Feed importFeedAppend(Cluster cluster, String frequency, String start, String end) - throws FalconException, ParseException { - - Import imp = getAnImport(MergeType.APPEND); - Feed feed = publishFeed(cluster, frequency, start, end); - return feed; - } - - private Import getAnImport(MergeType mergeType) { - Extract extract = new Extract(); - extract.setType(ExtractMethod.FULL); - extract.setMergepolicy(mergeType); - - FieldIncludeExclude fieldInclude = new FieldIncludeExclude(); - fieldInclude.getFields().add("id"); - fieldInclude.getFields().add("name"); - FieldsType fields = new FieldsType(); - fields.setIncludes(fieldInclude); - - Datasource source = new Datasource(); - source.setName("test-db"); - source.setTableName("test-table"); - source.setExtract(extract); - source.setFields(fields); - - Argument a1 = new Argument(); - a1.setName("--split_by"); - a1.setValue("id"); - Argument a2 = new Argument(); - a2.setName("--num-mappers"); - a2.setValue("2"); - Arguments args = new Arguments(); - List<Argument> argList = args.getArguments(); - argList.add(a1); - argList.add(a2); - - Import imp = new Import(); - imp.setSource(source); - imp.setArguments(args); - return imp; - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java deleted file mode 100644 index 30edd94..0000000 --- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.AccessControlList; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.entity.v0.feed.Validity; -import org.apache.falcon.expression.ExpressionHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.TimeZone; - -/** - * Test class for File System Storage. - */ -public class FileSystemStorageTest { - - private static final String USER = FalconTestUtil.TEST_USER_1; - - @BeforeClass - public void setUp() { - CurrentUser.authenticate(USER); - } - - @Test - public void testGetType() throws Exception { - final Location location = new Location(); - location.setPath("/foo/bar"); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations); - Assert.assertEquals(storage.getType(), Storage.TYPE.FILESYSTEM); - } - - @Test - public void testCreateFromUriTemplate() throws Exception { - String feedBasePath = "DATA=hdfs://localhost:8020" - + "/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}" - + "#" - + "META=hdfs://localhost:8020" - + "/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}" - + "#" - + "STATS=hdfs://localhost:8020" - + "/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}"; - - FileSystemStorage storage = new FileSystemStorage(feedBasePath); - Assert.assertEquals(storage.getUriTemplate(), feedBasePath); - - Assert.assertEquals("hdfs://localhost:8020", storage.getStorageUrl()); - Assert.assertEquals("hdfs://localhost:8020/data/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}", - storage.getUriTemplate(LocationType.DATA)); - Assert.assertEquals("hdfs://localhost:8020/stats/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}", - storage.getUriTemplate(LocationType.STATS)); - Assert.assertEquals("hdfs://localhost:8020/meta/YYYY/feed1/mmHH/dd/MM/${YEAR}-${MONTH}-${DAY}/more/${YEAR}", - storage.getUriTemplate(LocationType.META)); - } - - @Test - public void testGetUriTemplateForData() throws Exception { - final Location location = new Location(); - location.setPath("/foo/bar"); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "jail://global:00/foo/bar"); - } - - @Test - public void testFSHomeDir() { - final Location location = new Location(); - location.setPath("foo/bar"); // relative path - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations); - Assert.assertEquals(storage.getWorkingDir().toString(), "/user/" + FalconTestUtil.TEST_USER_1); - } - - @Test - public void testGetUriTemplateForDataWithRelativePath() throws Exception { - final Location location = new Location(); - location.setPath("foo/bar"); // relative path - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), - "hdfs://localhost:41020/user/" + USER + "/foo/bar"); - - storage = new FileSystemStorage("hdfs://localhost:41020/", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), - "hdfs://localhost:41020/user/" + USER + "/foo/bar"); - } - - @Test - public void testGetUriTemplateForDataWithAbsolutePath() throws Exception { - final Location location = new Location(); - location.setPath("/foo/bar"); // absolute path - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar"); - - storage = new FileSystemStorage("hdfs://localhost:41020/", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar"); - } - - @Test - public void testGetUriTemplateForDataWithAbsoluteURL() throws Exception { - final String absoluteUrl = "s3://host:1000/foo/bar"; - final Location location = new Location(); - location.setPath(absoluteUrl); // absolute url - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), absoluteUrl); - - storage = new FileSystemStorage("hdfs://localhost:41020/", locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), absoluteUrl); - } - - @Test - public void testValidateACL() throws Exception { - final Location location = new Location(); - Path path = new Path("/foo/bar"); - location.setPath(path.toString()); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - String user = System.getProperty("user.name"); - EmbeddedCluster cluster = EmbeddedCluster.newCluster(user); - FileSystem fs = cluster.getFileSystem(); - fs.mkdirs(path); - - FileSystemStorage storage = new FileSystemStorage( - cluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY), locations); - storage.validateACL(new TestACL(user, user, "0x755")); - - //-ve case - try { - storage.validateACL(new TestACL("random", user, "0x755")); - Assert.fail("Validation should have failed"); - } catch(FalconException e) { - //expected exception - } - - //Timed path - location.setPath("/foo/bar/${YEAR}/${MONTH}/${DAY}"); - storage.validateACL(new TestACL(user, user, "rrr")); - - //-ve case - try { - storage.validateACL(new TestACL("random", user, "0x755")); - Assert.fail("Validation should have failed"); - } catch(FalconException e) { - //expected exception - } - } - - @DataProvider(name = "locationTestWithRelativePathDataProvider") - private Object[][] createLocationTestDataWithRelativePath() { - return new Object[][] { - {"hdfs://h:0", "localDC/rc/billing/ua2", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"hdfs://h:0/", "localDC/rc/billing/ua2", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"hdfs://h:0", "localDC/rc/billing/ua2/", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"hdfs://h:0/", "localDC/rc/billing/ua2/", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"hdfs://h:0", "localDC/rc/billing/ua2//", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"hdfs://h:0/", "localDC/rc/billing/ua2//", "hdfs://h:0/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}", "localDC/rc/billing/ua2", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}/", "localDC/rc/billing/ua2", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}", "localDC/rc/billing/ua2/", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}/", "localDC/rc/billing/ua2/", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}/", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}/", "localDC/rc/billing/ua2//", "${nameNode}/user/" + USER + "/localDC/rc/billing/ua2"}, - {"${nameNode}", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"}, - {"${nameNode}/", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"}, - {"hdfs://h:0", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"}, - {"hdfs://h:0/", "s3://h:p/localDC/rc/billing/ua2//", "s3://h:p/localDC/rc/billing/ua2"}, - }; - } - - @Test (dataProvider = "locationTestWithRelativePathDataProvider") - public void testGetUriTemplateWithRelativePath(String storageUrl, String path, - String expected) throws Exception { - final Location location = new Location(); - location.setPath(path); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage(storageUrl, locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), expected); - } - - @Test - public void testGetUriTemplate() throws Exception { - final Location dataLocation = new Location(); - dataLocation.setPath("/data/foo/bar"); - dataLocation.setType(LocationType.DATA); - - final Location metaLocation = new Location(); - metaLocation.setPath("/meta/foo/bar"); - metaLocation.setType(LocationType.META); - - final Location statsLocation = new Location(); - statsLocation.setPath("/stats/foo/bar"); - statsLocation.setType(LocationType.STATS); - - final Location tmpLocation = new Location(); - tmpLocation.setPath("/tmp/foo/bar"); - tmpLocation.setType(LocationType.TMP); - - List<Location> locations = new ArrayList<Location>(); - locations.add(dataLocation); - locations.add(metaLocation); - locations.add(statsLocation); - locations.add(tmpLocation); - - StringBuilder expected = new StringBuilder(); - expected.append(LocationType.DATA) - .append(FileSystemStorage.LOCATION_TYPE_SEP) - .append("jail://global:00/data/foo/bar") - .append(FileSystemStorage.FEED_PATH_SEP) - .append(LocationType.META) - .append(FileSystemStorage.LOCATION_TYPE_SEP) - .append("jail://global:00/meta/foo/bar") - .append(FileSystemStorage.FEED_PATH_SEP) - .append(LocationType.STATS) - .append(FileSystemStorage.LOCATION_TYPE_SEP) - .append("jail://global:00/stats/foo/bar") - .append(FileSystemStorage.FEED_PATH_SEP) - .append(LocationType.TMP) - .append(FileSystemStorage.LOCATION_TYPE_SEP) - .append("jail://global:00/tmp/foo/bar"); - - FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations); - Assert.assertEquals(storage.getUriTemplate(), expected.toString()); - } - - @Test - public void testGetUriTemplateWithOutStorageURL() throws Exception { - final Location location = new Location(); - location.setPath("/foo/bar"); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage(FileSystemStorage.FILE_SYSTEM_URL, locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "${nameNode}/foo/bar"); - } - - @DataProvider(name = "locationTestDataProvider") - private Object[][] createLocationTestData() { - return new Object[][] { - {"jail://global:00", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"}, - {"jail://global:00", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"}, - {"jail://global:00", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"}, - {"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"}, - {"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"}, - {"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"}, - }; - } - - @Test (dataProvider = "locationTestDataProvider") - public void testGetUriTemplateWithLocationType(String storageUrl, String path, - String expected) throws Exception { - final Location location = new Location(); - location.setPath(path); - location.setType(LocationType.DATA); - List<Location> locations = new ArrayList<Location>(); - locations.add(location); - - FileSystemStorage storage = new FileSystemStorage(storageUrl, locations); - Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), storageUrl + expected); - } - - @Test - public void testIsIdentical() throws Exception { - final String storageUrl = "jail://global:00"; - final Location location1 = new Location(); - location1.setPath("/foo/bar"); - location1.setType(LocationType.DATA); - List<Location> locations1 = new ArrayList<Location>(); - locations1.add(location1); - FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1); - - final Location location2 = new Location(); - location2.setPath("/foo/bar"); - location2.setType(LocationType.DATA); - List<Location> locations2 = new ArrayList<Location>(); - locations2.add(location2); - FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2); - - Assert.assertTrue(storage1.isIdentical(storage2)); - } - - @Test - public void testIsIdenticalNegative() throws Exception { - final String storageUrl = "jail://global:00"; - final Location location1 = new Location(); - location1.setPath("/foo/baz"); - location1.setType(LocationType.DATA); - List<Location> locations1 = new ArrayList<Location>(); - locations1.add(location1); - FileSystemStorage storage1 = new FileSystemStorage(storageUrl, locations1); - - final Location location2 = new Location(); - location2.setPath("/foo/bar"); - location2.setType(LocationType.DATA); - List<Location> locations2 = new ArrayList<Location>(); - locations2.add(location2); - FileSystemStorage storage2 = new FileSystemStorage(storageUrl, locations2); - - Assert.assertFalse(storage1.isIdentical(storage2)); - } - - private class TestACL extends AccessControlList { - - /** - * owner is the Owner of this entity. - */ - private String owner; - - /** - * group is the one which has access to read - not used at this time. - */ - private String group; - - /** - * permission is not enforced at this time. - */ - private String permission; - - TestACL(String owner, String group, String permission) { - this.owner = owner; - this.group = group; - this.permission = permission; - } - - @Override - public String getOwner() { - return owner; - } - - @Override - public String getGroup() { - return group; - } - - @Override - public String getPermission() { - return permission; - } - } - - @DataProvider(name = "testListingDataProvider") - private Object[][] createTestListingData() { - final long millis = 24L * 3600 * 1000; - final long now = System.currentTimeMillis(); - TimeZone utc = TimeZone.getTimeZone("UTC"); - return new Object[][] { - {null, Frequency.fromString("hours(2)"), utc, new Date(now - 60 * millis), new Date(now - 56 * millis)}, - {null, Frequency.fromString("days(1)"), utc, new Date(now - 20 * millis), new Date(now + 6 * millis)}, - {null, Frequency.fromString("months(1)"), utc, new Date(now - 85 * millis), new Date(now - 10 * millis)}, - }; - } - - @Test (dataProvider = "testListingDataProvider") - public void testListing(String availabilityFlag, Frequency frequency, TimeZone timeZone, - Date start, Date end) throws Exception { - EmbeddedCluster cluster = EmbeddedCluster.newCluster("TestFeedListing", false); - FileSystem fs = cluster.getFileSystem(); - ConfigurationStore.get().publish(EntityType.CLUSTER, cluster.getCluster()); - try { - Feed feed = getFeed(availabilityFlag, frequency, timeZone); - List<FeedInstanceStatus> expected = prepareData(fs, feed, start, end); - FileSystemStorage fileSystemStorage = new FileSystemStorage(cluster.getFileSystem(). - getUri().toString(), feed.getLocations()); - List<FeedInstanceStatus> actual = fileSystemStorage. - getListing(feed, "TestFeedListing", LocationType.DATA, start, end); - Assert.assertEquals(actual, expected, "Feed instance Listings doesn't match"); - } finally { - ConfigurationStore.get().remove(EntityType.CLUSTER, cluster.getCluster().getName()); - } - } - - @SuppressWarnings("MagicConstant") - private List<FeedInstanceStatus> prepareData(FileSystem fs, Feed feed, - Date start, Date end) throws Exception { - fs.delete(new Path("/TestFeedListing"), true); - Random random = new Random(); - List<FeedInstanceStatus> instances = new ArrayList<FeedInstanceStatus>(); - String basePath = feed.getLocations().getLocations().get(0).getPath(); - Frequency frequency = feed.getFrequency(); - TimeZone tz = feed.getTimezone(); - Date dataStart = EntityUtil.getNextStartTime(feed.getClusters().getClusters().get(0).getValidity().getStart(), - feed.getFrequency(), tz, new Date(start.getTime())); - Date dataEnd = new Date(end.getTime()); - while (dataStart.before(dataEnd)) { - Properties properties = ExpressionHelper.getTimeVariables(dataStart, tz); - String path = ExpressionHelper.substitute(basePath, properties); - FeedInstanceStatus instance = new FeedInstanceStatus(path); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.MISSING); - instance.setSize(-1); - instance.setCreationTime(0); - Date date = FeedHelper.getDate(basePath, new Path(path), tz); - instance.setInstance(SchemaHelper.formatDateUTC(date)); - Calendar cal = Calendar.getInstance(); - cal.setTime(dataStart); - cal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt()); - dataStart.setTime(cal.getTimeInMillis()); - if (random.nextBoolean()) { - OutputStream out = fs.create(new Path(path, "file")); - out.write("Hello World\n".getBytes()); - out.close(); - instance.setSize(12); - if (feed.getAvailabilityFlag() == null - || (feed.getAvailabilityFlag() != null && random.nextBoolean())) { - //If availability is not present or if ok to create availability file, mark as available - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.AVAILABLE); - if (feed.getAvailabilityFlag() != null) { - fs.create(new Path(path, feed.getAvailabilityFlag())).close(); - } - } else if (feed.getAvailabilityFlag() != null) { - //If availability is present or not ok to create availability file, mark as partial - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); - } - } else { - if (feed.getAvailabilityFlag() == null && random.nextBoolean()) { - //If availability is not present or ok to create dir, mark as empty - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); - instance.setSize(0); - } else if (feed.getAvailabilityFlag() != null && random.nextBoolean()) { - //If availability is present and ok to create dir, mark as partial - fs.mkdirs(new Path(path)); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.PARTIAL); - } else if (feed.getAvailabilityFlag() != null) { - //If availability is present and ok to create empty instance - fs.create(new Path(path, feed.getAvailabilityFlag())).close(); - instance.setStatus(FeedInstanceStatus.AvailabilityStatus.EMPTY); - instance.setSize(0); - } - } - try { - FileStatus fileStatus = fs.getFileStatus(new Path(path)); - instance.setCreationTime(fileStatus.getModificationTime()); - } catch (IOException e) { - //ignore - } - instances.add(instance); - } - return instances; - } - - private Feed getFeed(String availabilityFlag, Frequency frequency, TimeZone timeZone) { - Feed feed = new Feed(); - feed.setAvailabilityFlag(availabilityFlag); - feed.setFrequency(frequency); - feed.setTimezone(timeZone); - feed.setLocations(new Locations()); - Location dataLocation = new Location(); - feed.getLocations().getLocations().add(dataLocation); - dataLocation.setPath("/TestFeedListing/data/${YEAR}/${MONTH}/${DAY}" - + (frequency.getTimeUnit() == Frequency.TimeUnit.hours ? "/${HOUR}" : "") + "/MORE"); - dataLocation.setType(LocationType.DATA); - feed.setClusters(new Clusters()); - Cluster cluster = new Cluster(); - cluster.setName("TestFeedListing"); - feed.getClusters().getClusters().add(cluster); - Validity validity = new Validity(); - cluster.setValidity(validity); - validity.setStart(new Date(System.currentTimeMillis() - (1000L * 24 * 3600000))); - validity.setEnd(new Date(System.currentTimeMillis() + (1000L * 24 * 3600000))); - return feed; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java deleted file mode 100644 index c37cebd..0000000 --- a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity; - -import junit.framework.Assert; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfaces; -import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.cluster.Property; -import org.apache.falcon.security.SecurityUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; -import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler; -import org.testng.annotations.Test; - -import java.util.Properties; - -/** - * Tests for HiveUtil. - */ -public class HiveUtilTest { - - @Test - public void testGetHiveCredentialsWithoutKerberos() { - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, PseudoAuthenticationHandler.TYPE); - Cluster cluster = new Cluster(); - String metaStoreUrl = "thrift://localhost:19083"; - - // set registry interface - Interfaces interfaces = new Interfaces(); - Interface registry = new Interface(); - registry.setEndpoint(metaStoreUrl); - registry.setType(Interfacetype.REGISTRY); - registry.setVersion("0.1"); - interfaces.getInterfaces().add(registry); - cluster.setInterfaces(interfaces); - - Properties expected = new Properties(); - expected.put(HiveUtil.METASTORE_UGI, "true"); - expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); - expected.put(HiveUtil.METASTROE_URI, metaStoreUrl); - expected.put(HiveUtil.METASTOREURIS, metaStoreUrl); - - Properties actual = HiveUtil.getHiveCredentials(cluster); - Assert.assertTrue(actual.equals(expected)); - } - - @Test - public void testGetHiveCredentialsWithKerberos() { - StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, KerberosAuthenticationHandler.TYPE); - Cluster cluster = new Cluster(); - String metaStoreUrl = "thrift://localhost:19083"; - String principal = "kerberosPrincipal"; - - // set registry interface - Interfaces interfaces = new Interfaces(); - Interface registry = new Interface(); - registry.setEndpoint(metaStoreUrl); - registry.setType(Interfacetype.REGISTRY); - registry.setVersion("0.1"); - interfaces.getInterfaces().add(registry); - cluster.setInterfaces(interfaces); - - // set security properties - org.apache.falcon.entity.v0.cluster.Properties props = new org.apache.falcon.entity.v0.cluster.Properties(); - Property principal2 = new Property(); - principal2.setName(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL); - principal2.setValue(principal); - props.getProperties().add(principal2); - cluster.setProperties(props); - Properties expected = new Properties(); - expected.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true"); - expected.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal); - expected.put(SecurityUtil.METASTORE_PRINCIPAL, principal); - expected.put(HiveUtil.METASTORE_UGI, "true"); - expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat")); - expected.put(HiveUtil.METASTROE_URI, metaStoreUrl); - expected.put(HiveUtil.METASTOREURIS, metaStoreUrl); - - Properties actual = HiveUtil.getHiveCredentials(cluster); - Assert.assertTrue(actual.equals(expected)); - } - - -} -
