http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java deleted file mode 100644 index 7159966..0000000 --- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java +++ /dev/null @@ -1,632 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.parser; - -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Property; -import org.apache.falcon.entity.v0.process.Cluster; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.fs.Path; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; - -/** - * Tests for validating process entity parser. - */ -public class ProcessEntityParserTest extends AbstractTestBase { - - private final ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - - @Test - public void testNotNullgetUnmarshaller() throws Exception { - Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller(); - Assert.assertNotNull(unmarshaller); - } - - @BeforeClass - public void init() throws Exception { - this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); - this.conf = dfsCluster.getConf(); - } - - @AfterClass - public void tearDown() { - this.dfsCluster.shutdown(); - } - - @Override - @BeforeMethod - public void setup() throws Exception { - storeEntity(EntityType.CLUSTER, "testCluster"); - storeEntity(EntityType.FEED, "impressionFeed"); - storeEntity(EntityType.FEED, "clicksFeed"); - storeEntity(EntityType.FEED, "imp-click-join1"); - storeEntity(EntityType.FEED, "imp-click-join2"); - storeEntity(EntityType.PROCESS, "sample"); - dfsCluster.getFileSystem().mkdirs(new Path("/falcon/test/workflow")); - } - - @Test - public void testParse() throws FalconException, JAXBException { - - Process process = parser.parseAndValidate(getClass().getResourceAsStream(PROCESS_XML)); - - Assert.assertNotNull(process); - Assert.assertEquals(process.getName(), "sample"); - - Assert.assertEquals(process.getParallel(), 1); - Assert.assertEquals(process.getOrder().name(), "LIFO"); - Assert.assertEquals(process.getFrequency().toString(), "hours(1)"); - Assert.assertEquals(process.getEntityType(), EntityType.PROCESS); - - Assert.assertEquals(process.getTags(), - "[email protected], [email protected], _department_type=forecasting"); - Assert.assertEquals(process.getPipelines(), "testPipeline,dataReplication_Pipeline"); - - Assert.assertEquals(process.getInputs().getInputs().get(0).getName(), "impression"); - Assert.assertEquals(process.getInputs().getInputs().get(0).getFeed(), "impressionFeed"); - Assert.assertEquals(process.getInputs().getInputs().get(0).getStart(), "today(0,0)"); - Assert.assertEquals(process.getInputs().getInputs().get(0).getEnd(), "today(2,0)"); - Assert.assertEquals(process.getInputs().getInputs().get(0).getPartition(), "*/US"); - Assert.assertEquals(process.getInputs().getInputs().get(0).isOptional(), false); - - Assert.assertEquals(process.getOutputs().getOutputs().get(0).getName(), "impOutput"); - Assert.assertEquals(process.getOutputs().getOutputs().get(0).getFeed(), "imp-click-join1"); - Assert.assertEquals(process.getOutputs().getOutputs().get(0).getInstance(), "today(0,0)"); - - Assert.assertEquals(process.getProperties().getProperties().get(0).getName(), "name1"); - Assert.assertEquals(process.getProperties().getProperties().get(0).getValue(), "value1"); - - Cluster processCluster = process.getClusters().getClusters().get(0); - Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()), "2011-11-02T00:00Z"); - Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2091-12-30T00:00Z"); - Assert.assertEquals(process.getTimezone().getID(), "UTC"); - - Assert.assertEquals(process.getSla().getShouldStartIn().toString(), "hours(2)"); - Assert.assertEquals(process.getSla().getShouldEndIn().toString(), "hours(4)"); - - Assert.assertEquals(process.getWorkflow().getEngine().name().toLowerCase(), "oozie"); - Assert.assertEquals(process.getWorkflow().getPath(), "/falcon/test/workflow"); - - StringWriter stringWriter = new StringWriter(); - Marshaller marshaller = EntityType.PROCESS.getMarshaller(); - marshaller.marshal(process, stringWriter); - System.out.println(stringWriter.toString()); - - // TODO for retry and late policy - } - - @Test - public void testELExpressions() throws Exception { - Process process = parser.parseAndValidate(getClass().getResourceAsStream(PROCESS_XML)); - process.getInputs().getInputs().get(0).setStart("lastMonth(0,0,0)"); - try { - parser.validate(process); - throw new AssertionError("Expected ValidationException!"); - } catch (ValidationException e) { - //ignore - } - - process.getInputs().getInputs().get(0).setStart("today(0,0)"); - process.getInputs().getInputs().get(0).setEnd("lastMonth(0,0,0)"); - try { - parser.validate(process); - throw new AssertionError("Expected ValidationException!"); - } catch (ValidationException e) { - //ignore - } - - process.getInputs().getInputs().get(0).setStart("today(2,0)"); - process.getInputs().getInputs().get(0).setEnd("today(0,0)"); - try { - parser.validate(process); - throw new AssertionError("Expected ValidationException!"); - } catch (ValidationException e) { - //ignore - } - } - - @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "shouldStartIn of Process:.*") - public void testInvalidShouldStart() throws FalconException { - Process process = parser.parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getSla().setShouldStartIn(new Frequency("hours(4)")); - process.getSla().setShouldEndIn(new Frequency("hours(2)")); - parser.validate(process); - } - - - @Test(expectedExceptions = FalconException.class, - expectedExceptionsMessageRegExp = ".* greater than timeout.*") - public void testShouldStartGreaterThanTimeout() throws FalconException { - Process process = parser.parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getSla().setShouldStartIn(new Frequency("hours(2)")); - process.setTimeout(new Frequency("hours(1)")); - parser.validate(process); - } - - - - @Test(expectedExceptions = FalconException.class) - public void doParseInvalidXML() throws IOException, FalconException { - - String invalidProcessXml = "/config/process/process-invalid.xml"; - parser.parseAndValidate(this.getClass().getResourceAsStream(invalidProcessXml)); - } - - @Test(expectedExceptions = ValidationException.class) - public void applyValidationInvalidProcess() throws Exception { - Process process = parser.parseAndValidate(getClass().getResourceAsStream(PROCESS_XML)); - process.getClusters().getClusters().get(0).setName("invalid cluster"); - parser.validate(process); - } - - @Test(expectedExceptions = FalconException.class) - public void testValidate() throws FalconException { - parser.parseAndValidate("<process></process>"); - } - - //SUSPEND CHECKSTYLE CHECK HiddenFieldCheck - @Test - public void testConcurrentParsing() throws Exception { - List<Thread> threadList = new ArrayList<Thread>(); - - for (int i = 0; i < 3; i++) { - threadList.add(new Thread() { - @Override - public void run() { - try { - EntityParser parser = EntityParserFactory.getParser(EntityType.PROCESS); - parser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } - for (Thread thread : threadList) { - thread.start(); - } - for (Thread thread : threadList) { - thread.join(); - } - } - //RESUME CHECKSTYLE CHECK HiddenFieldCheck - - @Test(expectedExceptions = ValidationException.class) - public void testInvalidDependentFeedsRetentionLimit() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getInputs().getInputs().get(0).setStart("today(-48,0)"); - parser.validate(process); - } - - @Test(expectedExceptions = ValidationException.class) - public void testDuplicateInputOutputNames() throws FalconException { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getInputs().getInputs().get(0).setName("duplicateName"); - process.getOutputs().getOutputs().get(0).setName("duplicateName"); - parser.validate(process); - } - - @Test(expectedExceptions = FalconException.class) - public void testInvalidRetryAttempt() throws FalconException { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getRetry().setAttempts(-1); - parser.parseAndValidate(process.toString()); - } - - @Test(expectedExceptions = FalconException.class) - public void testInvalidRetryDelay() throws FalconException { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getRetry().setDelay(Frequency.fromString("hours(0)")); - parser.parseAndValidate(process.toString()); - } - - @Test() - public void testRetryTimeout() throws FalconException { - Process process = parser - .parseAndValidate(ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML)); - process.getRetry().setOnTimeout(new Boolean("true")); - parser.parseAndValidate(process.toString()); - } - - @Test(expectedExceptions = ValidationException.class) - public void testInvalidLateInputs() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getLateProcess().getLateInputs().get(0).setInput("invalidInput"); - parser.parseAndValidate(process.toString()); - } - - @Test(expectedExceptions = FalconException.class) - public void testInvalidProcessName() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.setName("name_with_underscore"); - parser.parseAndValidate(process.toString()); - } - - @Test - public void testOozieFutureExpression() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getInputs().getInputs().get(0).setStart("future(1,2)"); - parser.parseAndValidate(process.toString()); - } - - @Test - public void testOozieLatestExpression() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getInputs().getInputs().get(0).setStart("latest(-1)"); - parser.parseAndValidate(process.toString()); - } - - @Test(expectedExceptions = ValidationException.class) - public void testDuplicateClusterName() throws Exception { - Process process = parser - .parse((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().add(1, process.getClusters().getClusters().get(0)); - parser.validate(process); - } - - @Test - public void testProcessForTableStorage() throws Exception { - Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - this.getClass().getResource("/config/feed/hive-table-feed.xml")); - getStore().publish(EntityType.FEED, inFeed); - - Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - this.getClass().getResource("/config/feed/hive-table-feed-out.xml")); - getStore().publish(EntityType.FEED, outFeed); - - Process process = parser.parse( - ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml")); - Input input = process.getInputs().getInputs().get(0); - Assert.assertFalse(input.isOptional()); - parser.validate(process); - - // Test Optional Inputs For Table Storage - try { - input.setOptional(Boolean.TRUE); - Assert.assertTrue(input.isOptional()); - parser.validate(process); - Assert.fail("Validation exception must have been thrown."); - } catch (FalconException e) { - Assert.assertTrue(e instanceof ValidationException); - } - } - - @Test(expectedExceptions = ValidationException.class) - public void testValidateInputPartitionForTable() throws Exception { - Process process = parser.parse( - ProcessEntityParserTest.class.getResourceAsStream("/config/process/process-table.xml")); - if (process.getInputs() != null) { - for (Input input : process.getInputs().getInputs()) { - input.setPartition("region=usa"); - } - } - - parser.validate(process); - Assert.fail("An exception should have been thrown since Input partitions are not supported for table storage"); - } - - @Test - public void testValidateEmailNotification() throws Exception { - Process process = parser.parseAndValidate(getClass().getResourceAsStream(PROCESS_XML)); - Assert.assertNotNull(process.getNotification()); - Assert.assertEquals(process.getNotification().getTo(), "falcon@localhost"); - Assert.assertEquals(process.getNotification().getType(), "email"); - } - - @Test - public void testValidateACLWithNoACLAndAuthorizationDisabled() throws Exception { - InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML); - - Process process = parser.parse(stream); - Assert.assertNotNull(process); - Assert.assertNull(process.getACL()); - - parser.validate(process); - } - - @Test - public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception { - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml"); - - Process process = parser.parse(stream); - Assert.assertNotNull(process); - Assert.assertNotNull(process.getACL()); - Assert.assertNotNull(process.getACL().getOwner()); - Assert.assertNotNull(process.getACL().getGroup()); - Assert.assertNotNull(process.getACL().getPermission()); - - parser.validate(process); - } - - @Test (expectedExceptions = ValidationException.class) - public void testValidateACLWithNoACLAndAuthorizationEnabled() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - CurrentUser.authenticate(FalconTestUtil.TEST_USER_1); - - try { - // need a new parser since it caches authorization enabled flag - ProcessEntityParser processEntityParser = - (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML); - - Process process = processEntityParser.parse(stream); - Assert.assertNotNull(process); - Assert.assertNull(process.getACL()); - - processEntityParser.validate(process); - Assert.fail("Validation exception should have been thrown for empty ACL"); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - @Test (expectedExceptions = ValidationException.class) - public void testValidateACLAuthorizationEnabledValidOwnerBadGroup() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - CurrentUser.authenticate(FalconTestUtil.TEST_USER_1); - - try { - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml"); - - // need a new parser since it caches authorization enabled flag - ProcessEntityParser processEntityParser = - (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - Process process = processEntityParser.parseAndValidate(stream); - Assert.assertNotNull(process); - Assert.assertNotNull(process.getACL()); - Assert.assertNotNull(process.getACL().getOwner()); - Assert.assertNotNull(process.getACL().getGroup()); - Assert.assertNotNull(process.getACL().getPermission()); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - @Test - public void testValidateACLAuthorizationEnabledValidGroupBadOwner() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - CurrentUser.authenticate(USER); // valid user but acl owner is falcon - - try { - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml"); - - // need a new parser since it caches authorization enabled flag - ProcessEntityParser processEntityParser = - (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - Process process = processEntityParser.parse(stream); - Assert.assertNotNull(process); - Assert.assertNotNull(process.getACL()); - Assert.assertNotNull(process.getACL().getOwner()); - Assert.assertNotNull(process.getACL().getGroup()); - Assert.assertNotNull(process.getACL().getPermission()); - - process.getACL().setOwner(USER); - process.getACL().setGroup(getPrimaryGroupName()); - - processEntityParser.validate(process); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - @Test (expectedExceptions = ValidationException.class) - public void testValidateACLAuthorizationEnabledBadOwnerAndGroup() throws Exception { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true"); - Assert.assertTrue(Boolean.valueOf( - StartupProperties.get().getProperty("falcon.security.authorization.enabled"))); - CurrentUser.authenticate("blah"); - - try { - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml"); - - // need a new parser since it caches authorization enabled flag - ProcessEntityParser processEntityParser = - (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - Process process = processEntityParser.parse(stream); - - Assert.assertNotNull(process); - Assert.assertNotNull(process.getACL()); - Assert.assertNotNull(process.getACL().getOwner()); - Assert.assertNotNull(process.getACL().getGroup()); - Assert.assertNotNull(process.getACL().getPermission()); - - processEntityParser.validate(process); - Assert.fail("Validation exception should have been thrown for invalid owner"); - } finally { - StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false"); - } - } - - /** - * A negative test for validating pipelines tag which is comma separated values. - * @throws FalconException - */ - @Test - public void testPipelineTags() throws FalconException { - try { - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-bad-pipeline.xml"); - - parser.parse(stream); - Assert.fail("org.xml.sax.SAXParseException should have been thrown."); - } catch (FalconException e) { - Assert.assertEquals(javax.xml.bind.UnmarshalException.class, e.getCause().getClass()); - } - } - - @Test(expectedExceptions = ValidationException.class) - public void testEndTimeProcessBeforeStartTime() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setEnd( - SchemaHelper.parseDateUTC("2010-12-31T00:00Z")); - parser.validate(process); - } - - @Test(expectedExceptions = ValidationException.class) - public void testInstanceStartTimeBeforeFeedStartTimeForInput() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setStart( - SchemaHelper.parseDateUTC("2011-10-31T00:00Z")); - parser.validate(process); - } - - @Test(expectedExceptions = ValidationException.class) - public void testInstanceEndTimeAfterFeedEndTimeForInput() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setStart( - SchemaHelper.parseDateUTC("2011-12-31T00:00Z")); - parser.validate(process); - } - - @Test(expectedExceptions = ValidationException.class) - public void testInstanceTimeBeforeFeedStartTimeForOutput() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setStart( - SchemaHelper.parseDateUTC("2011-11-02T00:00Z")); - process.getOutputs().getOutputs().get(0).setInstance("yesterday(-60,0)"); - parser.validate(process); - } - - @Test(expectedExceptions = ValidationException.class) - public void testInstanceTimeAfterFeedEndTimeForOutput() throws Exception { - Process process = parser - .parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setStart( - SchemaHelper.parseDateUTC("2011-12-30T00:00Z")); - process.getOutputs().getOutputs().get(0).setInstance("today(120,0)"); - parser.validate(process); - } - - @Test - public void testValidateProcessProperties() throws Exception { - ProcessEntityParser processEntityParser = Mockito - .spy((ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS)); - InputStream stream = this.getClass().getResourceAsStream("/config/process/process-0.1.xml"); - Process process = parser.parse(stream); - - Mockito.doNothing().when(processEntityParser).validateACL(process); - - // Good set of properties, should work - processEntityParser.validate(process); - - // add duplicate property, should throw validation exception. - Property property1 = new Property(); - property1.setName("name1"); - property1.setValue("any value"); - process.getProperties().getProperties().add(property1); - try { - processEntityParser.validate(process); - Assert.fail(); // should not reach here - } catch (ValidationException e) { - // Do nothing - } - - // Remove duplicate property. It should not throw exception anymore - process.getProperties().getProperties().remove(property1); - processEntityParser.validate(process); - - // add empty property name, should throw validation exception. - property1.setName(""); - process.getProperties().getProperties().add(property1); - try { - processEntityParser.validate(process); - Assert.fail(); // should not reach here - } catch (ValidationException e) { - // Do nothing - } - } - - @Test - public void testProcessEndTimeOptional() throws FalconException { - Process process = parser.parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - process.getClusters().getClusters().get(0).getValidity().setEnd(null); - parser.validate(process); - } - - @Test - public void testProcessEndTime() throws FalconException { - Process process = parser.parseAndValidate((ProcessEntityParserTest.class - .getResourceAsStream(PROCESS_XML))); - String feedName = process.getInputs().getInputs().get(0).getFeed(); - Feed feedEntity = EntityUtil.getEntity(EntityType.FEED, feedName); - feedEntity.getClusters().getClusters().get(0).getValidity().setEnd(null); - process.getClusters().getClusters().get(0).getValidity().setEnd(null); - parser.validate(process); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java deleted file mode 100644 index fa3d3f4..0000000 --- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.service.ConfigurationChangeListener; -import org.apache.falcon.util.StartupProperties; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterSuite; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.Test; - -import java.io.IOException; - -/** - * Tests for validating configuration store. - */ -public class ConfigurationStoreTest { - - private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStoreTest.class); - private static final String PROCESS1NAME = "process1"; - private static final String PROCESS2NAME = "process2"; - private static final String PROCESS3NAME = "process3"; - - private ConfigurationStore store = ConfigurationStore.get(); - private TestListener listener = new TestListener(); - - private class TestListener implements ConfigurationChangeListener { - @Override - public void onAdd(Entity entity) throws FalconException { - throw new FalconException("For test"); - } - - @Override - public void onRemove(Entity entity) throws FalconException { - throw new FalconException("For test"); - } - - @Override - public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { - throw new FalconException("For test"); - } - - @Override - public void onReload(Entity entity) throws FalconException { - throw new FalconException("For test"); - } - } - - - @BeforeClass - public void setUp() throws Exception { - System.out.println("in beforeMethod"); - Process process1 = new Process(); - process1.setName(PROCESS1NAME); - store.publish(EntityType.PROCESS, process1); - - Process process2 = new Process(); - process2.setName(PROCESS2NAME); - store.publish(EntityType.PROCESS, process2); - - Process process3 = new Process(); - process3.setName(PROCESS3NAME); - store.publish(EntityType.PROCESS, process3); - } - - @Test - public void testPublish() throws Exception { - Process process = new Process(); - process.setName("hello"); - store.publish(EntityType.PROCESS, process); - Process p = store.get(EntityType.PROCESS, "hello"); - Assert.assertEquals(p, process); - - store.registerListener(listener); - process.setName("world"); - try { - store.publish(EntityType.PROCESS, process); - throw new AssertionError("Expected exception"); - } catch(FalconException expected) { - //expected - } - store.unregisterListener(listener); - } - - @Test - public void testGet() throws Exception { - Process p = store.get(EntityType.PROCESS, "notfound"); - Assert.assertNull(p); - } - - @Test - public void testRemove() throws Exception { - Process process = new Process(); - process.setName("remove"); - store.publish(EntityType.PROCESS, process); - - Process p = store.get(EntityType.PROCESS, "remove"); - Assert.assertEquals(p, process); - store.remove(EntityType.PROCESS, "remove"); - p = store.get(EntityType.PROCESS, "remove"); - Assert.assertNull(p); - - store.publish(EntityType.PROCESS, process); - store.registerListener(listener); - try { - store.remove(EntityType.PROCESS, "remove"); - throw new AssertionError("Expected exception"); - } catch(FalconException expected) { - //expected - } - store.unregisterListener(listener); - } - - - @Test(threadPoolSize = 3, invocationCount = 6) - public void testConcurrentRemoveOfSameProcess() throws Exception { - store.remove(EntityType.PROCESS, PROCESS1NAME); - Process p = store.get(EntityType.PROCESS, PROCESS1NAME); - Assert.assertNull(p); - } - - @Test(threadPoolSize = 3, invocationCount = 6) - public void testConcurrentRemove() throws Exception { - store.remove(EntityType.PROCESS, PROCESS2NAME); - Process p1 = store.get(EntityType.PROCESS, PROCESS2NAME); - Assert.assertNull(p1); - - store.remove(EntityType.PROCESS, PROCESS3NAME); - Process p2 = store.get(EntityType.PROCESS, PROCESS3NAME); - Assert.assertNull(p2); - } - - @BeforeSuite - @AfterSuite - public void cleanup() throws IOException { - Path path = new Path(StartupProperties.get(). - getProperty("config.store.uri")); - FileSystem fs = FileSystem.get(path.toUri(), new Configuration()); - fs.delete(path, true); - LOG.info("Cleaned up {}", path); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java deleted file mode 100644 index 033a55b..0000000 --- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.store; - -import org.apache.commons.io.FileUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.feed.CatalogTable; -import org.apache.falcon.entity.v0.feed.Cluster; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.FalconRadixUtils; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.falcon.util.StartupProperties; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.File; -import java.net.URI; -import java.util.Collection; - - -/** - * Tests for FeedLocationStore. - */ -public class FeedLocationStoreTest extends AbstractTestBase { - private ConfigurationStore store; - - - @BeforeClass - public void initConfigStore() throws Exception { - String configPath = new URI(StartupProperties.get().getProperty("config.store.uri")).getPath(); - String location = configPath + "-" + getClass().getName(); - StartupProperties.get().setProperty("config.store.uri", location); - FileUtils.deleteDirectory(new File(location)); - - cleanupStore(); - String listeners = StartupProperties.get().getProperty("configstore.listeners"); - listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""); - listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", ""); - StartupProperties.get().setProperty("configstore.listeners", listeners); - store = ConfigurationStore.get(); - store.init(); - - CurrentUser.authenticate(FalconTestUtil.TEST_USER_2); - - } - @BeforeMethod - public void setUp() throws FalconException{ - cleanupStore(); - createClusters(); - } - - @AfterMethod - public void print() { - System.out.printf("%s", FeedLocationStore.get().store); - } - - @Test - public void testOnAddSameLocation() throws FalconException{ - Feed f1 = createFeed("f1SameLocations"); - int initialSize = FeedLocationStore.get().store.getSize(); - f1.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/projects/cas/data/hourly/2014/09/09/09")); - f1.getLocations().getLocations().add(createLocation(LocationType.STATS, - "/projects/cas/stats/hourly/2014/09/09/09")); - - Feed f2 = createFeed("f2SameLocations"); - f2.getLocations().getLocations().add(createLocation(LocationType.STATS, - "/projects/cas/data/hourly/2014/09/09/09")); - f2.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/projects/cas/stats/hourly/2014/09/09/09")); - - store.publish(EntityType.FEED, f1); - store.publish(EntityType.FEED, f2); - int finalSize = FeedLocationStore.get().store.getSize(); - Assert.assertEquals(finalSize - initialSize, 8); - } - - @Test - public void testOnRemove() throws FalconException{ - int initialSize = FeedLocationStore.get().store.getSize(); - - Feed f1 = createFeed("f1ForRemove"); - f1.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/projects/cas/data/hourly/2014/09/09/09")); - f1.getLocations().getLocations().add(createLocation(LocationType.STATS, - "/projects/cas/data/hourly/2014/09/09/09")); - - store.publish(EntityType.FEED, f1); - Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 4); - store.remove(EntityType.FEED, "f1ForRemove"); - Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); - - } - - - @Test - public void testOnChange() throws FalconException{ - Feed f1 = createFeed("f1"); - f1.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/projects/cas/data/hourly/2014/09/09/09")); - store.publish(EntityType.FEED, f1); - - Feed f2 = createFeed("f1"); - f2.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/projects/cas/data/monthly")); - store.initiateUpdate(f2); - store.update(EntityType.FEED, f2); - store.cleanupUpdateInit(); - - Feed f3 = createFeed("f2"); - f3.getLocations().getLocations().add(createLocation(LocationType.STATS, - "/projects/cas/data/hourly/2014/09/09/09")); - store.publish(EntityType.FEED, f3); - - } - - @Test - public void testWithClusterLocations() throws FalconException { - Feed f = createFeedWithClusterLocations("clusterFeed"); - int initialSize = FeedLocationStore.get().store.getSize(); - store.publish(EntityType.FEED, f); - Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 6); - store.remove(EntityType.FEED, "clusterFeed"); - Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); - } - - - @Test - public void testFindWithRegularExpression() throws FalconException { - Feed f = createFeed("findUsingRegexFeed"); - f.getLocations().getLocations().add(createLocation(LocationType.DATA, - "/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}")); - store.publish(EntityType.FEED, f); - Assert.assertNotNull(FeedLocationStore.get().store.find("/falcon/test/input/2014/12/12/23", - new FalconRadixUtils.FeedRegexAlgorithm())); - } - - @Test - public void testAddCatalogStorageFeeds() throws FalconException { - //this test ensure that catalog feeds are ignored in FeedLocationStore - Feed f = createCatalogFeed("catalogFeed"); - store.publish(EntityType.FEED, f); - Assert.assertTrue(true); - } - - private Feed createCatalogFeed(String name) { - Feed f = new Feed(); - f.setName(name); - f.setClusters(createBlankClusters()); - f.setTable(new CatalogTable()); - return f; - } - - private Feed createFeed(String name){ - Feed f = new Feed(); - Locations locations = new Locations(); - f.setLocations(locations); - f.setName(name); - f.setClusters(createBlankClusters()); - return f; - } - - - private Feed createFeedWithClusterLocations(String name) { - Feed f = new Feed(); - f.setLocations(new Locations()); - f.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/data")); - f.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/stats")); - f.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/meta")); - f.setName(name); - f.setClusters(createClustersWithLocations()); - return f; - } - - private Location createLocation(LocationType type, String path){ - Location location = new Location(); - location.setPath(path); - location.setType(type); - return location; - } - - protected void cleanupStore() throws FalconException { - store = ConfigurationStore.get(); - for (EntityType type : EntityType.values()) { - Collection<String> entities = store.getEntities(type); - for (String entity : entities) { - store.remove(type, entity); - } - } - } - - private Clusters createClustersWithLocations() { - Clusters clusters = new Clusters(); - Cluster cluster1 = new Cluster(); - cluster1.setName("cluster1WithLocations"); - cluster1.setLocations(new Locations()); - cluster1.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster1/data")); - cluster1.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster1/stats")); - cluster1.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster1/meta")); - - Cluster cluster2 = new Cluster(); - cluster2.setName("cluster2WithLocations"); - cluster2.setLocations(new Locations()); - cluster2.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster2/data")); - cluster2.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster2/stats")); - cluster2.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster2/meta")); - - clusters.getClusters().add(cluster1); - clusters.getClusters().add(cluster2); - - return clusters; - } - - private Clusters createBlankClusters() { - Clusters clusters = new Clusters(); - - Cluster cluster = new Cluster(); - cluster.setName("blankCluster1"); - clusters.getClusters().add(cluster); - - Cluster cluster2 = new Cluster(); - cluster2.setName("blankCluster2"); - clusters.getClusters().add(cluster2); - - return clusters; - } - - private void createClusters() throws FalconException { - String[] clusterNames = {"cluster1WithLocations", "cluster2WithLocations", "blankCluster1", "blankCluster2"}; - for (String name : clusterNames) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster(); - cluster.setName(name); - cluster.setColo("default"); - store.publish(EntityType.CLUSTER, cluster); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java b/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java deleted file mode 100644 index 23f69d7..0000000 --- a/common/src/test/java/org/apache/falcon/entity/v0/EntityGraphTest.java +++ /dev/null @@ -1,407 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.entity.v0; - -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.feed.Argument; -import org.apache.falcon.entity.v0.feed.Arguments; -import org.apache.falcon.entity.v0.feed.Clusters; -import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Extract; -import org.apache.falcon.entity.v0.feed.ExtractMethod; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.FieldsType; -import org.apache.falcon.entity.v0.feed.FieldIncludeExclude; -import org.apache.falcon.entity.v0.feed.Import; -import org.apache.falcon.entity.v0.feed.MergeType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.datasource.Datasource; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; -import org.apache.falcon.entity.v0.process.Process; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.util.List; -import java.util.Set; - -/** - * Entity graph tests. - */ -public class EntityGraphTest extends AbstractTestBase { - - private ConfigurationStore store = ConfigurationStore.get(); - - private EntityGraph graph = EntityGraph.get(); - - @Test - public void testOnAdd() throws Exception { - - Process process = new Process(); - process.setName("p1"); - Cluster cluster = new Cluster(); - cluster.setName("c1"); - cluster.setColo("1"); - Feed f1 = addInput(process, "f1", cluster); - Feed f2 = addInput(process, "f2", cluster); - Feed f3 = addOutput(process, "f3", cluster); - Feed f4 = addOutput(process, "f4", cluster); - org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster(); - processCluster.setName("c1"); - process.setClusters(new org.apache.falcon.entity.v0.process.Clusters()); - process.getClusters().getClusters().add(processCluster); - - store.publish(EntityType.CLUSTER, cluster); - store.publish(EntityType.FEED, f1); - store.publish(EntityType.FEED, f2); - store.publish(EntityType.FEED, f3); - store.publish(EntityType.FEED, f4); - store.publish(EntityType.PROCESS, process); - - Set<Entity> entities = graph.getDependents(process); - Assert.assertEquals(entities.size(), 5); - Assert.assertTrue(entities.contains(cluster)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f2)); - Assert.assertTrue(entities.contains(f3)); - Assert.assertTrue(entities.contains(f4)); - - entities = graph.getDependents(f1); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(process)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f2); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(process)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f3); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(process)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f4); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(process)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(cluster); - Assert.assertEquals(entities.size(), 5); - Assert.assertTrue(entities.contains(process)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f2)); - Assert.assertTrue(entities.contains(f3)); - Assert.assertTrue(entities.contains(f4)); - } - - private Feed addInput(Process process, String feed, Cluster cluster) { - if (process.getInputs() == null) { - process.setInputs(new Inputs()); - } - Inputs inputs = process.getInputs(); - Input input = new Input(); - input.setFeed(feed); - inputs.getInputs().add(input); - Feed f1 = new Feed(); - f1.setName(feed); - Clusters clusters = new Clusters(); - f1.setClusters(clusters); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(cluster.getName()); - clusters.getClusters().add(feedCluster); - return f1; - } - - private Feed addFeedImport(String feed, Cluster cluster, Datasource ds) { - - Feed f1 = new Feed(); - f1.setName(feed); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(cluster.getName()); - feedCluster.setType(ClusterType.SOURCE); - Clusters clusters = new Clusters(); - clusters.getClusters().add(feedCluster); - f1.setClusters(clusters); - - Import imp = getAnImport(MergeType.SNAPSHOT, ds); - f1.getClusters().getClusters().get(0).setImport(imp); - return f1; - } - - private Import getAnImport(MergeType mergeType, Datasource ds) { - Extract extract = new Extract(); - extract.setType(ExtractMethod.FULL); - extract.setMergepolicy(mergeType); - - FieldsType fields = new FieldsType(); - FieldIncludeExclude fieldInclude = new FieldIncludeExclude(); - fieldInclude.getFields().add("id"); - fieldInclude.getFields().add("name"); - fields.setIncludes(fieldInclude); - - org.apache.falcon.entity.v0.feed.Datasource source = new org.apache.falcon.entity.v0.feed.Datasource(); - source.setName(ds.getName()); - source.setTableName("test-table"); - source.setExtract(extract); - source.setFields(fields); - - Argument a1 = new Argument(); - a1.setName("--split_by"); - a1.setValue("id"); - Argument a2 = new Argument(); - a2.setName("--num-mappers"); - a2.setValue("2"); - Arguments args = new Arguments(); - List<Argument> argList = args.getArguments(); - argList.add(a1); - argList.add(a2); - - Import imp = new Import(); - imp.setSource(source); - imp.setArguments(args); - return imp; - } - - private void attachInput(Process process, Feed feed) { - if (process.getInputs() == null) { - process.setInputs(new Inputs()); - } - Inputs inputs = process.getInputs(); - Input input = new Input(); - input.setFeed(feed.getName()); - inputs.getInputs().add(input); - } - - private Feed addOutput(Process process, String feed, Cluster cluster) { - if (process.getOutputs() == null) { - process.setOutputs(new Outputs()); - } - Outputs outputs = process.getOutputs(); - Output output = new Output(); - output.setFeed(feed); - outputs.getOutputs().add(output); - Feed f1 = new Feed(); - f1.setName(feed); - Clusters clusters = new Clusters(); - f1.setClusters(clusters); - org.apache.falcon.entity.v0.feed.Cluster feedCluster = - new org.apache.falcon.entity.v0.feed.Cluster(); - feedCluster.setName(cluster.getName()); - clusters.getClusters().add(feedCluster); - return f1; - } - - @Test - public void testOnRemove() throws Exception { - Process process = new Process(); - process.setName("rp1"); - Cluster cluster = new Cluster(); - cluster.setName("rc1"); - cluster.setColo("2"); - org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster(); - processCluster.setName("rc1"); - process.setClusters(new org.apache.falcon.entity.v0.process.Clusters()); - process.getClusters().getClusters().add(processCluster); - - store.publish(EntityType.CLUSTER, cluster); - store.publish(EntityType.PROCESS, process); - - Set<Entity> entities = graph.getDependents(process); - Assert.assertEquals(entities.size(), 1); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(cluster); - Assert.assertEquals(entities.size(), 1); - Assert.assertTrue(entities.contains(process)); - - store.remove(EntityType.PROCESS, process.getName()); - entities = graph.getDependents(cluster); - Assert.assertTrue(entities == null); - - entities = graph.getDependents(process); - Assert.assertTrue(entities == null); - } - - @Test - public void testOnRemove2() throws Exception { - - Process p1 = new Process(); - p1.setName("ap1"); - Process p2 = new Process(); - p2.setName("ap2"); - Cluster cluster = new Cluster(); - cluster.setName("ac1"); - cluster.setColo("3"); - Feed f1 = addInput(p1, "af1", cluster); - Feed f3 = addOutput(p1, "af3", cluster); - Feed f2 = addOutput(p2, "af2", cluster); - attachInput(p2, f3); - org.apache.falcon.entity.v0.process.Cluster processCluster = new org.apache.falcon.entity.v0.process.Cluster(); - processCluster.setName("ac1"); - p1.setClusters(new org.apache.falcon.entity.v0.process.Clusters()); - p1.getClusters().getClusters().add(processCluster); - processCluster = new org.apache.falcon.entity.v0.process.Cluster(); - processCluster.setName("ac1"); - p2.setClusters(new org.apache.falcon.entity.v0.process.Clusters()); - p2.getClusters().getClusters().add(processCluster); - - store.publish(EntityType.CLUSTER, cluster); - store.publish(EntityType.FEED, f1); - store.publish(EntityType.FEED, f2); - store.publish(EntityType.FEED, f3); - store.publish(EntityType.PROCESS, p1); - store.publish(EntityType.PROCESS, p2); - - Set<Entity> entities = graph.getDependents(p1); - Assert.assertEquals(entities.size(), 3); - Assert.assertTrue(entities.contains(cluster)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f3)); - - entities = graph.getDependents(p2); - Assert.assertEquals(entities.size(), 3); - Assert.assertTrue(entities.contains(cluster)); - Assert.assertTrue(entities.contains(f2)); - Assert.assertTrue(entities.contains(f3)); - - entities = graph.getDependents(f1); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f2); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(p2)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f3); - Assert.assertEquals(entities.size(), 3); - Assert.assertTrue(entities.contains(p2)); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(cluster); - Assert.assertEquals(entities.size(), 5); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(p2)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f2)); - Assert.assertTrue(entities.contains(f3)); - - store.remove(EntityType.PROCESS, p2.getName()); - store.remove(EntityType.FEED, f2.getName()); - - entities = graph.getDependents(p1); - Assert.assertEquals(entities.size(), 3); - Assert.assertTrue(entities.contains(cluster)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f3)); - - entities = graph.getDependents(p2); - Assert.assertTrue(entities == null); - - entities = graph.getDependents(f1); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(f2); - Assert.assertTrue(entities == null); - - entities = graph.getDependents(f3); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(cluster)); - - entities = graph.getDependents(cluster); - Assert.assertEquals(entities.size(), 3); - Assert.assertTrue(entities.contains(p1)); - Assert.assertTrue(entities.contains(f1)); - Assert.assertTrue(entities.contains(f3)); - } - - @Test - public void testOnChange() throws Exception { - } - - @Test - public void testOnAddImport() throws Exception { - - Datasource ds = new Datasource(); - ds.setName("test-db"); - ds.setColo("c1"); - - Cluster cluster = new Cluster(); - cluster.setName("ci1"); - cluster.setColo("c1"); - - Feed f1 = addFeedImport("fi1", cluster, ds); - - store.publish(EntityType.CLUSTER, cluster); - store.publish(EntityType.DATASOURCE, ds); - store.publish(EntityType.FEED, f1); - - Set<Entity> entities = graph.getDependents(cluster); - Assert.assertEquals(entities.size(), 1); - Assert.assertTrue(entities.contains(f1)); - - entities = graph.getDependents(ds); - Assert.assertEquals(entities.size(), 1); - Assert.assertTrue(entities.contains(f1)); - - entities = graph.getDependents(f1); - Assert.assertEquals(entities.size(), 2); - Assert.assertTrue(entities.contains(cluster)); - Assert.assertTrue(entities.contains(ds)); - - store.remove(EntityType.FEED, "fi1"); - store.remove(EntityType.DATASOURCE, "test-db"); - store.remove(EntityType.CLUSTER, "ci1"); - } - - @Test - public void testOnRemoveDatasource() throws Exception { - - Datasource ds = new Datasource(); - ds.setName("test-db"); - ds.setColo("c1"); - - Cluster cluster = new Cluster(); - cluster.setName("ci1"); - cluster.setColo("c1"); - - Feed f1 = addFeedImport("fi1", cluster, ds); - - store.publish(EntityType.CLUSTER, cluster); - store.publish(EntityType.DATASOURCE, ds); - store.publish(EntityType.FEED, f1); - - store.remove(EntityType.DATASOURCE, "test-db"); - - Set<Entity> entities = graph.getDependents(f1); - Assert.assertEquals(1, entities.size()); - Assert.assertTrue(entities.contains(cluster)); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java deleted file mode 100644 index da5dbca..0000000 --- a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.expression; - -import org.apache.falcon.FalconException; -import org.testng.Assert; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.text.ParseException; -import java.util.Date; - -/** - * Unit test cases for EL Expressions. - */ -public class ExpressionHelperTest { - - private ExpressionHelper expressionHelper = ExpressionHelper.get(); - - @BeforeTest - public void init() throws ParseException { - Date referenceDate = ExpressionHelper.FORMATTER.get().parse("2015-02-01T00:00Z"); - expressionHelper.setReferenceDate(referenceDate); - } - - @Test(dataProvider = "ElExpressions") - public void testStartOffset(String expression, String expectedDateStr) throws FalconException { - Date evalDate = expressionHelper.evaluate(expression, Date.class); - String evalDateStr = ExpressionHelper.FORMATTER.get().format(evalDate); - Assert.assertEquals(evalDateStr, expectedDateStr); - } - - - @DataProvider(name = "ElExpressions") - public Object[][] createOffsets() { - return new Object[][] { - {"now(-10,-30)", "2015-01-31T13:30Z"}, - {"now(10,-30)", "2015-02-01T09:30Z"}, - - {"today(0,0)", "2015-02-01T00:00Z"}, - {"today(-1,0)", "2015-01-31T23:00Z"}, - {"yesterday(0,0)", "2015-01-31T00:00Z"}, - {"yesterday(-1,0)", "2015-01-30T23:00Z"}, - {"yesterday(1,30)", "2015-01-31T01:30Z"}, - - {"currentMonth(2,0,0)", "2015-02-03T00:00Z"}, - {"currentMonth(-2,1,30)", "2015-01-30T01:30Z"}, - {"lastMonth(3,0,0)", "2015-01-04T00:00Z"}, - {"lastMonth(-3,0,0)", "2014-12-29T00:00Z"}, - - {"currentWeek('THU',0,0)", "2015-01-29T00:00Z"}, - {"currentWeek('SUN',0,0)", "2015-02-01T00:00Z"}, - {"lastWeek('THU',0,0)", "2015-01-22T00:00Z"}, - {"lastWeek('SUN',0,0)", "2015-01-25T00:00Z"}, - - {"currentYear(1,1,0,0)", "2015-02-02T00:00Z"}, - {"currentYear(-1,1,0,0)", "2014-12-02T00:00Z"}, - {"lastYear(1,1,0,0)", "2014-02-02T00:00Z"}, - {"lastYear(-1,1,0,0)", "2013-12-02T00:00Z"}, - - // latest and future will return the reference time - {"latest(0)", "2015-02-01T00:00Z"}, - {"latest(-1)", "2015-02-01T00:00Z"}, - {"future(0,0)", "2015-02-01T00:00Z"}, - {"future(1,0)", "2015-02-01T00:00Z"}, - }; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/group/FeedGroupMapTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/group/FeedGroupMapTest.java b/common/src/test/java/org/apache/falcon/group/FeedGroupMapTest.java deleted file mode 100644 index a6c52e3..0000000 --- a/common/src/test/java/org/apache/falcon/group/FeedGroupMapTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.group; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; -import org.apache.falcon.entity.v0.feed.Locations; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.util.Map; - -/** - * Feed group map tests. - */ -public class FeedGroupMapTest extends AbstractTestBase { - private static Cluster cluster; - - @BeforeClass - public void setUp() throws Exception { - cluster = (Cluster) EntityType.CLUSTER - .getUnmarshaller() - .unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/cluster/cluster-0.1.xml")); - } - - @BeforeMethod - public void cleanup() throws Exception { - cleanupStore(); - } - - @Test - public void testOnAdd() throws FalconException, JAXBException { - getStore().publish(EntityType.CLUSTER, cluster); - Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/feed/feed-0.1.xml")); - feed1.setName("f1"); - feed1.setGroups("group1,group2,group3"); - Location location = new Location(); - location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad"); - location.setType(LocationType.DATA); - feed1.setLocations(new Locations()); - feed1.getLocations().getLocations().add(location); - getStore().publish(EntityType.FEED, feed1); - Map<String, FeedGroup> groupMapping = FeedGroupMap.get() - .getGroupsMapping(); - - FeedGroup group = groupMapping.get("group1"); - Assert.assertEquals(group.getName(), "group1"); - Assert.assertEquals(group.getFeeds().size(), 1); - assertFields(group, feed1); - - group = groupMapping.get("group2"); - Assert.assertEquals(group.getName(), "group2"); - Assert.assertEquals(group.getFeeds().size(), 1); - assertFields(group, feed1); - - group = groupMapping.get("group3"); - Assert.assertEquals(group.getName(), "group3"); - Assert.assertEquals(group.getFeeds().size(), 1); - assertFields(group, feed1); - - Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/feed/feed-0.1.xml")); - - feed2.setName("f2"); - feed2.setGroups("group1,group5,group3"); - location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/${MONTH}/${DAY}/ad2"); - location.setType(LocationType.DATA); - feed2.setLocations(new Locations()); - feed2.getLocations().getLocations().add(location); - getStore().publish(EntityType.FEED, feed2); - groupMapping = FeedGroupMap.get().getGroupsMapping(); - - group = groupMapping.get("group1"); - Assert.assertEquals(group.getName(), "group1"); - Assert.assertEquals(group.getFeeds().size(), 2); - assertFields(group, feed2); - - group = groupMapping.get("group2"); - Assert.assertEquals(group.getName(), "group2"); - Assert.assertEquals(group.getFeeds().size(), 1); - assertFields(group, feed2); - - group = groupMapping.get("group3"); - Assert.assertEquals(group.getName(), "group3"); - Assert.assertEquals(group.getFeeds().size(), 2); - assertFields(group, feed2); - - group = groupMapping.get("group5"); - Assert.assertEquals(group.getName(), "group5"); - Assert.assertEquals(group.getFeeds().size(), 1); - assertFields(group, feed2); - - } - - @Test - public void testOnRemove() throws FalconException, JAXBException { - Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/feed/feed-0.1.xml")); - feed1.setName("f1"); - getStore().publish(EntityType.CLUSTER, cluster); - feed1.setGroups("group7,group8,group9"); - Location location = new Location(); - location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad"); - location.setType(LocationType.DATA); - feed1.setLocations(new Locations()); - feed1.getLocations().getLocations().add(location); - getStore().publish(EntityType.FEED, feed1); - - Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/feed/feed-0.1.xml")); - feed2.setName("f2"); - feed2.setGroups("group7,group8,group10"); - location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/${MONTH}/${DAY}/ad2"); - location.setType(LocationType.DATA); - feed2.setLocations(new Locations()); - feed2.getLocations().getLocations().add(location); - getStore().publish(EntityType.FEED, feed2); - - Map<String, FeedGroup> groupMapping = FeedGroupMap.get() - .getGroupsMapping(); - - getStore().remove(EntityType.FEED, "f2"); - - FeedGroup group = groupMapping.get("group7"); - Assert.assertEquals(group.getName(), "group7"); - Assert.assertEquals(group.getFeeds().size(), 1); - - group = groupMapping.get("group8"); - Assert.assertEquals(group.getName(), "group8"); - Assert.assertEquals(group.getFeeds().size(), 1); - - group = groupMapping.get("group10"); - Assert.assertEquals(null, group); - - getStore().remove(EntityType.FEED, "f1"); - - group = groupMapping.get("group7"); - Assert.assertEquals(null, group); - - group = groupMapping.get("group8"); - Assert.assertEquals(null, group); - - group = groupMapping.get("group9"); - Assert.assertEquals(null, group); - - } - - @Test - public void testNullGroup() throws FalconException, JAXBException { - Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal( - FeedGroupMapTest.class - .getResourceAsStream("/config/feed/feed-0.1.xml")); - feed1.setName("f5" + System.currentTimeMillis()); - getStore().publish(EntityType.CLUSTER, cluster); - feed1.setGroups(null); - Location location = new Location(); - location.setPath("/projects/bi/rmc/daily/ad/${YEAR}/fraud/${MONTH}-${DAY}/ad"); - location.setType(LocationType.DATA); - feed1.setLocations(new Locations()); - feed1.getLocations().getLocations().add(location); - getStore().publish(EntityType.FEED, feed1); - - } - - private void assertFields(FeedGroup group, Feed feed) { - Assert.assertEquals(group.getFrequency(), feed.getFrequency()); - Assert.assertEquals(group.getDatePattern(), - "[${DAY}, ${MONTH}, ${YEAR}]"); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java b/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java deleted file mode 100644 index 6ef2710..0000000 --- a/common/src/test/java/org/apache/falcon/hadoop/HadoopClientFactoryTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hadoop; - -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.util.FalconTestUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.net.URI; - -/** - * Unit tests for HadoopClientFactory that doles out FileSystem handles. - */ -public class HadoopClientFactoryTest { - - private EmbeddedCluster embeddedCluster; - - @BeforeClass - public void setUp() throws Exception { - embeddedCluster = EmbeddedCluster.newCluster(getClass().getSimpleName()); - } - - @AfterClass - public void tearDown() throws Exception { - if (embeddedCluster != null) { - embeddedCluster.shutdown(); - } - } - - @Test - public void testGet() throws Exception { - HadoopClientFactory clientFactory = HadoopClientFactory.get(); - Assert.assertNotNull(clientFactory); - } - - @Test (enabled = false) // todo: cheated the conf to impersonate as same user - public void testCreateFileSystemWithSameUser() { - String user = System.getProperty("user.name"); - CurrentUser.authenticate(user); - try { - Configuration conf = embeddedCluster.getConf(); - URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)); - Assert.assertNotNull(uri); - HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf); - Assert.fail("Impersonation should have failed."); - } catch (Exception e) { - Assert.assertEquals(e.getCause().getClass(), RemoteException.class); - } - } - - @Test - public void testCreateFileSystem() throws Exception { - Configuration conf = embeddedCluster.getConf(); - - UserGroupInformation.setConfiguration(conf); - UserGroupInformation realUser = UserGroupInformation.createUserForTesting( - FalconTestUtil.TEST_USER_2, new String[]{"testgroup"}); - UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"}); - - URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)); - Assert.assertNotNull(uri); - FileSystem fs = HadoopClientFactory.get().createFileSystem(realUser, uri, conf); - Assert.assertNotNull(fs); - } - - @Test - public void testCreateFileSystemWithUser() throws Exception { - Configuration conf = embeddedCluster.getConf(); - - UserGroupInformation realUser = UserGroupInformation.createUserForTesting( - FalconTestUtil.TEST_USER_2, new String[]{"testgroup"}); - UserGroupInformation.createProxyUserForTesting("proxyuser", realUser, new String[]{"proxygroup"}); - UserGroupInformation.setConfiguration(conf); - - URI uri = new URI(conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)); - Assert.assertNotNull(uri); - - CurrentUser.authenticate(System.getProperty("user.name")); - FileSystem fs = HadoopClientFactory.get().createFileSystem(CurrentUser.getProxyUGI(), uri, conf); - Assert.assertNotNull(fs); - } -}
