Repository: falcon Updated Branches: refs/heads/master 2d5516ec3 -> db0604da8
FALCON-1552 Migration of ProcessInstanceManagerIT to use falcon unit (Narayan Periwal) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/db0604da Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/db0604da Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/db0604da Branch: refs/heads/master Commit: db0604da89edc4841b44ba9b5197cbc35aaff6e7 Parents: 2d5516e Author: Pallavi Rao <[email protected]> Authored: Thu Nov 19 14:13:42 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Thu Nov 19 14:13:42 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 4 + .../apache/falcon/unit/FalconUnitTestBase.java | 6 +- unit/src/test/resources/cluster-template.xml | 36 --- .../test/resources/local-cluster-template.xml | 36 +++ webapp/pom.xml | 27 +++ .../resource/ProcessInstanceManagerIT.java | 223 ++++++++++--------- .../apache/falcon/resource/UnitTestContext.java | 155 +++++++++++++ webapp/src/test/resources/sleepWorkflow.xml | 41 ++++ webapp/src/test/resources/startup.properties | 143 ++++++++++++ 10 files changed, 524 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 18cf582..4f46450 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1552 Migration of ProcessInstanceManagerIT to use falcon unit (Narayan Periwal via Pallavi Rao) + FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava) FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index dfaf1c1..fad8902 100644 --- a/pom.xml +++ b/pom.xml @@ -1348,6 +1348,10 @@ <argLine>-Djava.security.krb5.realm= -Djava.security.krb5.kdc= -Dhadoop.tmp.dir=${project.build.directory}/tmp-hadoop-${user.name}</argLine> <excludedGroups>${excluded.test.groups}</excludedGroups> + <systemPropertyVariables> + <project.version>${project.version}</project.version> + <project.build.directory>${project.build.directory}</project.build.directory> + </systemPropertyVariables> </configuration> <executions> <execution> http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index af5a3c5..42cb779 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -94,9 +94,9 @@ public class FalconUnitTestBase { private static final String DEFAULT_COLO = "local"; private static final String CLUSTER = "cluster"; private static final String COLO = "colo"; - private static final String CLUSTER_TEMPLATE = "/cluster-template.xml"; - private static final String STAGING_PATH = "/projects/falcon/staging"; - private static final String WORKING_PATH = "/projects/falcon/working"; + private static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml"; + protected static final String STAGING_PATH = "/projects/falcon/staging"; + protected static final String WORKING_PATH = "/projects/falcon/working"; public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); protected static final int WAIT_TIME = 90000; http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/unit/src/test/resources/cluster-template.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/cluster-template.xml b/unit/src/test/resources/cluster-template.xml deleted file mode 100644 index d0c9b24..0000000 --- a/unit/src/test/resources/cluster-template.xml +++ /dev/null @@ -1,36 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<cluster colo="##colo##" description="" name="##cluster##" xmlns="uri:falcon:cluster:0.1"> - <interfaces> - <interface type="readonly" endpoint="jail://global:00" - version="0.20.2"/> - <interface type="write" endpoint="jail://global:00" - version="0.20.2"/> - <interface type="execute" endpoint="local" version="0.20.2"/> - <interface type="workflow" endpoint="localoozie" - version="3.1"/> - </interfaces> - <locations> - <location name="staging" path="/projects/falcon/staging"/> - <location name="temp" path="/tmp"/> - <location name="working" path="/projects/falcon/working"/> - </locations> - <properties> - </properties> -</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/unit/src/test/resources/local-cluster-template.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/local-cluster-template.xml b/unit/src/test/resources/local-cluster-template.xml new file mode 100644 index 0000000..d0c9b24 --- /dev/null +++ b/unit/src/test/resources/local-cluster-template.xml @@ -0,0 +1,36 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<cluster colo="##colo##" description="" name="##cluster##" xmlns="uri:falcon:cluster:0.1"> + <interfaces> + <interface type="readonly" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="write" endpoint="jail://global:00" + version="0.20.2"/> + <interface type="execute" endpoint="local" version="0.20.2"/> + <interface type="workflow" endpoint="localoozie" + version="3.1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 0999c36..428f67e 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -88,6 +88,21 @@ <dependencies> <dependency> <groupId>org.apache.falcon</groupId> + <artifactId>falcon-unit</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-unit</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> <artifactId>falcon-common</artifactId> <version>${project.version}</version> <type>test-jar</type> @@ -541,6 +556,18 @@ </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19</version> + <configuration> + <systemPropertyVariables> + <project.version>${project.version}</project.version> + <project.build.directory>${project.build.directory}</project.build.directory> + </systemPropertyVariables> + </configuration> + </plugin> + <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index ffc203e..8dbbd7d 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -18,35 +18,45 @@ package org.apache.falcon.resource; -import org.apache.falcon.Tag; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.ExternalId; -import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.commons.io.FileUtils; +import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesResult.WorkflowStatus; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.unit.FalconUnitTestBase; import org.apache.falcon.util.OozieTestUtils; -import org.apache.falcon.workflow.engine.OozieClientFactory; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowJob; import org.testng.Assert; -import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import javax.ws.rs.core.MediaType; +import java.io.File; +import java.io.IOException; /** * Test class for Process Instance REST API. */ -@Test(enabled = false, groups = {"exhaustive"}) -public class ProcessInstanceManagerIT { +public class ProcessInstanceManagerIT extends FalconUnitTestBase { + private static final String START_INSTANCE = "2012-04-20T00:00Z"; - @AfterClass - public void tearDown() throws Exception { - TestContext.deleteEntitiesFromStore(); + @BeforeClass + @Override + public void setup() throws FalconException, IOException { + String version = System.getProperty("project.version"); + String buildDir = System.getProperty("project.build.directory"); + System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar"); + super.setup(); + } + + @AfterMethod + @Override + public void cleanUpActionXml() throws IOException, FalconException { + //Needed since oozie writes action xml to current directory. + FileUtils.deleteQuietly(new File("action.xml")); + FileUtils.deleteQuietly(new File(".action.xml.crc")); } protected void schedule(TestContext context) throws Exception { @@ -69,6 +79,7 @@ public class ProcessInstanceManagerIT { .header("Cookie", context.getAuthenticationToken()) .accept(MediaType.APPLICATION_JSON) .get(InstancesResult.class); + Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); @@ -106,150 +117,142 @@ public class ProcessInstanceManagerIT { Assert.assertEquals(processInstance.getStatus(), status); } - //@Test + @Test public void testGetInstanceStatus() throws Exception { - TestContext context = new TestContext(); - schedule(context); - InstancesResult response = context.service.path("api/instance/status/process/" + context.processName) - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .get(InstancesResult.class); + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcess(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-21T00:00Z"; + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); } - //@Test + @Test public void testGetInstanceStatusPagination() throws Exception { - TestContext context = new TestContext(); - schedule(context, 4); - - InstancesResult response = context.service.path("api/instance/status/process/" + context.processName) - .queryParam("orderBy", "startTime").queryParam("offset", "0") - .queryParam("numResults", "1").queryParam("filterBy", "STATUS:RUNNING") - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .get(InstancesResult.class); + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcessForPagination(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-20T00:03Z"; + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, "STATUS:RUNNING", "startTime", + "", 0, new Integer(1), null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); + Assert.assertEquals(response.getInstances()[0].getStatus(), WorkflowStatus.RUNNING); } - public void testReRunInstances() throws Exception { - testKillInstances(); - TestContext context = new TestContext(); - InstancesResult response = context.service.path("api/instance/rerun/process/" + context.processName) - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .post(InstancesResult.class); + @Test + public void testKillInstances() throws Exception { + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcess(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-21T00:00Z"; + context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, + context.colo, null, null, null, null); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.KILLED); + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); + assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); - waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING); - } + response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), context.processName, + START_INSTANCE, endTime, context.colo, null, "STATUS:KILLED", "startTime", "", 0, 1, null); - public void testKillInstances() throws Exception { - TestContext context = new TestContext(); - schedule(context); - InstancesResult response = context.service.path("api/instance/kill/process/" + context.processName) - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .post(InstancesResult.class); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); + } - response = context.service.path("api/instance/kill/process/" + context.processName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .post(InstancesResult.class); - Assert.assertEquals(response.getStatus(), APIResult.Status.FAILED); - Assert.assertNotNull(response.getMessage()); + @Test + public void testReRunInstances() throws Exception { + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcess(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-21T00:00Z"; + context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, + context.colo, null, null, null, null); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.KILLED); - response = context.service.path("api/instance/status/process/" + context.processName) - .queryParam("orderBy", "startTime").queryParam("filterBy", "STATUS:KILLED") - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .get(InstancesResult.class); + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); - response = context.service.path("api/instance/status/process/" + context.processName) - .queryParam("orderBy", "startTime").queryParam("filterBy", "STATUS:KILLED") - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .get(InstancesResult.class); + context.getClient().rerunInstances(EntityType.PROCESS.name(), context.processName, + START_INSTANCE, endTime, null, context.colo, null, null, null, true, null); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + + response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED); - - waitForWorkflow(START_INSTANCE, WorkflowJob.Status.KILLED); + assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); } + @Test public void testSuspendInstances() throws Exception { - TestContext context = new TestContext(); - schedule(context); - InstancesResult response = context.service.path("api/instance/suspend/process/" + context.processName) - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .post(InstancesResult.class); + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcess(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-21T00:00Z"; + context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + endTime, context.colo, context.clusterName, null, null, null); + + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.SUSPENDED); + + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.SUSPENDED); - - waitForWorkflow(START_INSTANCE, WorkflowJob.Status.SUSPENDED); } + @Test public void testResumesInstances() throws Exception { - testSuspendInstances(); + UnitTestContext context = new UnitTestContext(); + submitCluster(context.colo, context.clusterName, null); + context.scheduleProcess(); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); + String endTime = "2012-04-21T00:00Z"; + context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + endTime, context.colo, context.clusterName, null, null, null); - TestContext context = new TestContext(); - InstancesResult response = context.service.path("api/instance/resume/process/" + context.processName) - .queryParam("start", START_INSTANCE) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .post(InstancesResult.class); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.SUSPENDED); + + InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); Assert.assertNotNull(response.getInstances()); Assert.assertEquals(response.getInstances().length, 1); - assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); - - waitForWorkflow(START_INSTANCE, WorkflowJob.Status.RUNNING); - } + assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.SUSPENDED); - private void waitForWorkflow(String instance, WorkflowJob.Status status) throws Exception { - TestContext context = new TestContext(); - ExternalId extId = new ExternalId(context.processName, Tag.DEFAULT, EntityUtil.parseDateUTC(instance)); - OozieClient ozClient = OozieClientFactory.get( - (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, context.clusterName)); - String jobId = ozClient.getJobId(extId.getId()); - WorkflowJob jobInfo = null; - for (int i = 0; i < 15; i++) { - jobInfo = ozClient.getJobInfo(jobId); - if (jobInfo.getStatus() == status) { - break; - } - Thread.sleep((i + 1) * 1000); - } + context.getClient().resumeInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + endTime, context.colo, context.clusterName, null, null, null); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); - Assert.assertNotNull(jobInfo); - Assert.assertEquals(status, jobInfo.getStatus()); + response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), + context.processName, START_INSTANCE, endTime, context.colo, null, null, "", "", 0, 1, null); + Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertNotNull(response.getInstances()); + Assert.assertEquals(response.getInstances().length, 1); + assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java new file mode 100644 index 0000000..37442c3 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.resource; + +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.unit.FalconUnit; +import org.apache.falcon.unit.FalconUnitClient; +import org.apache.falcon.util.DeploymentUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.testng.Assert; + +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * Test Utility. + */ +public class UnitTestContext { + + public static final String FEED_TEMPLATE1 = "/feed-template1.xml"; + public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; + public static final String PROCESS_TEMPLATE = "/process-template.xml"; + + private static final String START_INSTANCE = "2012-04-20T00:00Z"; + + protected String colo; + protected String clusterName; + protected String processName; + protected String processEndTime; + protected String inputFeedName; + protected String outputFeedName; + + private static FalconUnitClient client; + private static FileSystem fs; + protected static ConfigurationStore configStore; + private Map<String, String> overlay; + + public UnitTestContext() throws FalconException, IOException { + client = FalconUnit.getClient(); + fs = FalconUnit.getFileSystem(); + configStore = client.getConfigStore(); + overlay = getUniqueOverlay(); + } + + public static FalconUnitClient getClient() { + return client; + } + + private static void mkdir(FileSystem fileSystem, Path path) throws Exception { + if (!fileSystem.exists(path) && !fileSystem.mkdirs(path)) { + throw new Exception("mkdir failed for " + path); + } + } + + private static void mkdir(FileSystem fileSystem, Path path, FsPermission perm) throws Exception { + if (!fileSystem.exists(path) && !fileSystem.mkdirs(path, perm)) { + throw new Exception("mkdir failed for " + path); + } + } + + private void prepare() throws Exception { + mkdir(fs, new Path("/falcon"), new FsPermission((short) 511)); + + Path wfParent = new Path("/falcon/test"); + fs.delete(wfParent, true); + Path wfPath = new Path(wfParent, "workflow"); + mkdir(fs, wfPath); + mkdir(fs, new Path("/falcon/test/workflow/lib")); + fs.copyFromLocalFile(false, true, + new Path(TestContext.class.getResource("/sleepWorkflow.xml").getPath()), + new Path(wfPath, "workflow.xml")); + mkdir(fs, new Path(wfParent, "input/2012/04/20/00")); + mkdir(fs, new Path(wfParent, "input/2012/04/21/00")); + Path outPath = new Path(wfParent, "output"); + mkdir(fs, outPath, new FsPermission((short) 511)); + } + + public void scheduleProcess() throws Exception { + scheduleProcess(PROCESS_TEMPLATE, overlay, 1); + } + + public void scheduleProcessForPagination() throws Exception { + scheduleProcess(PROCESS_TEMPLATE, overlay, 2); + } + + public void scheduleProcess(String processTemplate, Map<String, String> uniqueOverlay, int numInstances) throws + Exception { + prepare(); + + String tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE1, uniqueOverlay); + APIResult result = client.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + + tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE2, uniqueOverlay); + result = client.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + + tmpFile = TestContext.overlayParametersOverTemplate(processTemplate, uniqueOverlay); + result = client.submit(EntityType.PROCESS.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + + String scheduleTime = START_INSTANCE; + + result = client.schedule(EntityType.PROCESS, processName, scheduleTime, numInstances, clusterName, true, ""); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + public Map<String, String> getUniqueOverlay() throws FalconException { + Map<String, String> uniqueOverlay = new HashMap<String, String>(); + long time = System.currentTimeMillis(); + clusterName = "cluster" + time; + uniqueOverlay.put("src.cluster.name", clusterName); + uniqueOverlay.put("cluster", clusterName); + uniqueOverlay.put("colo", DeploymentUtil.getCurrentColo()); + colo = DeploymentUtil.getCurrentColo(); + inputFeedName = "in" + time; + uniqueOverlay.put("inputFeedName", inputFeedName); + //only feeds with future dates can be scheduled + Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000); + uniqueOverlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate)); + processEndTime = SchemaHelper.formatDateUTC(endDate); + uniqueOverlay.put("processEndDate", processEndTime); + outputFeedName = "out" + time; + uniqueOverlay.put("outputFeedName", outputFeedName); + processName = "p" + time; + uniqueOverlay.put("processName", processName); + uniqueOverlay.put("user", System.getProperty("user.name")); + uniqueOverlay.put("workflow.path", "/falcon/test/workflow"); + uniqueOverlay.put("workflow.lib.path", "/falcon/test/workflow/lib"); + return uniqueOverlay; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/webapp/src/test/resources/sleepWorkflow.xml ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/sleepWorkflow.xml b/webapp/src/test/resources/sleepWorkflow.xml new file mode 100644 index 0000000..9c31478 --- /dev/null +++ b/webapp/src/test/resources/sleepWorkflow.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-wf"> + <start to="java-node"/> + <action name="java-node"> + <java> + <job-tracker>local</job-tracker> + <name-node>jail://global:00</name-node> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>default</value> + </property> + </configuration> + <main-class>org.apache.falcon.unit.examples.JavaSleepExample</main-class> + </java> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app> http://git-wip-us.apache.org/repos/asf/falcon/blob/db0604da/webapp/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties new file mode 100644 index 0000000..756f315 --- /dev/null +++ b/webapp/src/test/resources/startup.properties @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +*.domain=debug + +######### Implementation classes ######### +## DONT MODIFY UNLESS SURE ABOUT CHANGE ## + +*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine +*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder +*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder +*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager +*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService +*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager +*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService + +##### Falcon Services ##### +*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ + org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ + org.apache.falcon.service.ProcessSubscriberService,\ + org.apache.falcon.entity.store.ConfigurationStore,\ + org.apache.falcon.rerun.service.RetryService,\ + org.apache.falcon.rerun.service.LateRunService,\ + org.apache.falcon.metadata.MetadataMappingService,\ + org.apache.falcon.service.ProxyUserService + +##### Falcon Configuration Store Change listeners ##### +*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ + org.apache.falcon.entity.ColoClusterRelation,\ + org.apache.falcon.group.FeedGroupMap,\ + org.apache.falcon.entity.store.FeedLocationStore,\ + org.apache.falcon.service.SharedLibraryHostingService + +##### JMS MQ Broker Implementation class ##### +*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory + + +######### System startup parameters ######### + +# Location to store user entity configurations +debug.config.store.uri=file://${user.dir}/target/store +debug.config.store.persist=false +debug.config.oozie.conf.uri=${user.dir}/target/oozie +debug.system.lib.location=${system.lib.location} +debug.broker.url=vm://localhost +debug.retry.recorder.path=${user.dir}/target/retry +debug.libext.feed.retention.paths=${falcon.libext} +debug.libext.feed.replication.paths=${falcon.libext} +debug.libext.process.paths=${falcon.libext} + +*.falcon.cleanup.service.frequency=minutes(5) + + +######### Properties for configuring JMS provider - activemq ######### +# Default Active MQ url +*.broker.url=tcp://localhost:61616 + +# default time-to-live for a JMS message 3 days (time in minutes) +*.broker.ttlInMins=4320 +*.entity.topic=FALCON.ENTITY.TOPIC +*.max.retry.failure.count=1 +*.retry.recorder.path=${user.dir}/logs/retry + +######### Properties for configuring iMon client and metric ######### +*.internal.queue.size=1000 + + +##### List of shared libraries for Falcon workflows ##### +*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 + +######### Authentication Properties ######### + +# Authentication type must be specified: simple|kerberos +*.falcon.authentication.type=simple + +##### Service Configuration + +# Indicates the Kerberos principal to be used in Falcon Service. +*.falcon.service.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the Service principal. +*.falcon.service.authentication.kerberos.keytab= + +# name node principal to talk to config store +*.dfs.namenode.kerberos.principal= + +##### SPNEGO Configuration + +# Authentication type must be specified: simple|kerberos|<class> +# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility +*.falcon.http.authentication.type=simple + +# Indicates how long (in seconds) an authentication token is valid before it has to be renewed. +*.falcon.http.authentication.token.validity=36000 + +# The signature secret for signing the authentication tokens. +*.falcon.http.authentication.signature.secret=falcon + +# The domain to use for the HTTP cookie that stores the authentication token. +*.falcon.http.authentication.cookie.domain= + +# Indicates if anonymous requests are allowed when using 'simple' authentication. +*.falcon.http.authentication.simple.anonymous.allowed=false + +# Indicates the Kerberos principal to be used for HTTP endpoint. +# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. +*.falcon.http.authentication.kerberos.principal= + +# Location of the keytab file with the credentials for the HTTP principal. +*.falcon.http.authentication.kerberos.keytab= + +# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details. +*.falcon.http.authentication.kerberos.name.rules=DEFAULT + +# Comma separated list of black listed users +*.falcon.http.authentication.blacklisted.users= + +######### Graph Database Properties ######### +# Graph implementation +*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory + +# Graph Storage +*.falcon.graph.storage.directory=${user.dir}/target/graphdb +*.falcon.graph.storage.backend=berkeleyje +*.falcon.graph.serialize.path=${user.dir}/target/graphdb +*.falcon.graph.preserve.history=false +*.falcon.graph.transaction.retry.count=3 +*.falcon.graph.transaction.retry.delay=5 \ No newline at end of file
