Repository: falcon Updated Branches: refs/heads/master 4445798be -> 2804c5d1a
FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2804c5d1 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2804c5d1 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2804c5d1 Branch: refs/heads/master Commit: 2804c5d1a8be9d19c0198c66b45373bbfb9dc0e0 Parents: 4445798 Author: Pallavi Rao <[email protected]> Authored: Tue Dec 15 14:06:33 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Dec 15 14:06:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/falcon/unit/FalconUnitClient.java | 19 +- .../apache/falcon/unit/FalconUnitTestBase.java | 2 +- .../falcon/resource/EntityManagerJerseyIT.java | 945 +++++++++---------- .../resource/ProcessInstanceManagerIT.java | 52 +- .../apache/falcon/resource/UnitTestContext.java | 46 +- .../org/apache/falcon/util/OozieTestUtils.java | 54 ++ 7 files changed, 602 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b594160..5753535 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -31,6 +31,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1577 Migration of EntityManagerJerseyIT to use falcon unit (Narayan Periwal via Pallavi Rao) + FALCON-1658 MySql Support for Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava) FALCON-1656 Improve FeedHelper:getRetentionFrequency method(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 9eb4277..f34a90c 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -43,6 +43,7 @@ import org.apache.falcon.util.DateUtil; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,9 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.TimeZone; @@ -354,14 +357,16 @@ public class FalconUnitClient extends AbstractFalconClient { public String getVersion(String doAsUser) throws FalconCLIException { AdminResource resource = new AdminResource(); AdminResource.PropertyList propertyList = resource.getVersion(); - StringBuilder properties = new StringBuilder(); - for(AdminResource.Property property : propertyList.properties) { - if (properties.length() > 1) { - properties.append(","); - } - properties.append(property.key).append(":").append(property.value); + Map<String, String> version = new LinkedHashMap<>(); + List<String> list = new ArrayList<>(); + for (AdminResource.Property property : propertyList.properties) { + Map<String, String> map = new LinkedHashMap<>(); + map.put("key", property.key); + map.put("value", property.value); + list.add(JSONValue.toJSONString(map)); } - return properties.toString(); + version.put("properties", list.toString()); + return version.toString(); } private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 2a73516..83afac7 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -94,7 +94,7 @@ public class FalconUnitTestBase { private static final String DEFAULT_COLO = "local"; private static final String CLUSTER = "cluster"; private static final String COLO = "colo"; - private static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml"; + protected static final String CLUSTER_TEMPLATE = "/local-cluster-template.xml"; protected static final String STAGING_PATH = "/projects/falcon/staging"; protected static final String WORKING_PATH = "/projects/falcon/working"; http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java index abe5bdd..439d148 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java @@ -18,11 +18,12 @@ package org.apache.falcon.resource; import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.falcon.cli.FalconCLI; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.feed.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.Location; @@ -32,10 +33,14 @@ import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.entity.v0.process.Validity; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.unit.FalconUnit; +import org.apache.falcon.unit.FalconUnitTestBase; import org.apache.falcon.util.BuildProperties; import org.apache.falcon.util.DeploymentProperties; import org.apache.falcon.util.FalconTestUtil; import org.apache.falcon.util.OozieTestUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsShell; @@ -46,19 +51,13 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; import org.apache.oozie.client.OozieClient; import org.testng.Assert; -import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import javax.servlet.ServletInputStream; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.xml.bind.JAXBException; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.StringReader; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -81,32 +80,46 @@ import java.util.regex.Pattern; * Tests should be enabled only in local environments as they need running instance of the web server. */ @Test(groups = {"exhaustive"}) -public class EntityManagerJerseyIT { +public class EntityManagerJerseyIT extends FalconUnitTestBase { + + private static final String START_INSTANCE = "2012-04-20T00:00Z"; + private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; + private static final String LOCAL_MODE = "local"; + private static final String IT_RUN_MODE = "it.run.mode"; @BeforeClass - public void prepare() throws Exception { - TestContext.prepare(); + @Override + public void setup() throws FalconException, IOException { + String version = System.getProperty("project.version"); + String buildDir = System.getProperty("project.build.directory"); + System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar"); + super.setup(); } - @AfterClass - public void tearDown() throws Exception { - TestContext.deleteEntitiesFromStore(); + @AfterMethod + @Override + public void cleanUpActionXml() throws IOException, FalconException { + //Needed since oozie writes action xml to current directory. + FileUtils.deleteQuietly(new File("action.xml")); + FileUtils.deleteQuietly(new File(".action.xml.crc")); + contexts.remove(); + } + + private ThreadLocal<UnitTestContext> contexts = new ThreadLocal<UnitTestContext>(); + + private UnitTestContext newContext() throws FalconException, IOException { + contexts.set(new UnitTestContext()); + return contexts.get(); } static void assertLibs(FileSystem fs, Path path) throws IOException { FileStatus[] libs = fs.listStatus(path); Assert.assertNotNull(libs); - Assert.assertEquals(libs.length, 1); - Assert.assertTrue(libs[0].getPath().getName().startsWith("falcon-hadoop-dependencies")); } - private Entity getDefinition(TestContext context, EntityType type, String name) throws Exception { - ClientResponse response = - context.service.path("api/entities/definition/" + type.name().toLowerCase() + "/" + name) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - return (Entity) type.getUnmarshaller().unmarshal(new StringReader(response.getEntity(String.class))); + private Entity getDefinition(EntityType type, String name) throws Exception { + Entity entity = falconUnitClient.getDefinition(type.name(), name, null); + return entity; } private void updateEndtime(Process process) { @@ -114,17 +127,61 @@ public class EntityManagerJerseyIT { processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000)); } + private void submitCluster(UnitTestContext context) throws IOException, FalconCLIException { + String mode = System.getProperty(IT_RUN_MODE); + if (StringUtils.isNotEmpty(mode) && mode.toLowerCase().equals(LOCAL_MODE)) { + submitCluster(context.colo, context.clusterName, null); + } else { + String tmpFile = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, context.overlay); + submit(EntityType.CLUSTER, tmpFile); + } + } + + private APIResult submitFeed(String template, Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay); + APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + return result; + } + + private void submitProcess(String template, Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay); + APIResult result = falconUnitClient.submit(EntityType.PROCESS.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + private void scheduleProcess(UnitTestContext context) throws FalconCLIException, IOException, FalconException { + String scheduleTime = START_INSTANCE; + APIResult result = scheduleProcess(context.getProcessName(), scheduleTime, 1, context.getClusterName(), + getAbsolutePath(SLEEP_WORKFLOW), true, ""); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + private void schedule(UnitTestContext context) throws Exception { + submitCluster(context); + context.prepare(); + submitFeeds(context.overlay); + submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay); + scheduleProcess(context); + } + + private void submitFeeds(Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, overlay); + APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE2, overlay); + result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + @Test public void testLibExtensions() throws Exception { - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); - ClientResponse response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); - FileSystem fs = context.getCluster().getFileSystem(); + UnitTestContext context = newContext(); + submitCluster(context); + FileSystem fs = FalconUnit.getFileSystem(); assertLibs(fs, new Path("/projects/falcon/working/libext/FEED/retention")); assertLibs(fs, new Path("/projects/falcon/working/libext/PROCESS")); - String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); + String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, context.overlay); Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName)); Location location = new Location(); location.setPath("fsext://global:00/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}"); @@ -133,63 +190,44 @@ public class EntityManagerJerseyIT { cluster.setLocations(new Locations()); feed.getClusters().getClusters().get(0).getLocations().getLocations().add(location); - File tmpFile = TestContext.getTempFile(); + File tmpFile = UnitTestContext.getTempFile(); EntityType.FEED.getMarshaller().marshal(feed, tmpFile); - response = context.submitAndSchedule(tmpFileName, overlay, EntityType.FEED); - context.assertSuccessful(response); - } - private ClientResponse update(TestContext context, Entity entity, - Date endTime, Boolean skipDryRun) throws Exception { - File tmpFile = TestContext.getTempFile(); - entity.getEntityType().getMarshaller().marshal(entity, tmpFile); - WebResource resource = context.service.path("api/entities/update/" - + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); - if (endTime != null) { - resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(endTime)); - } - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); - } - return resource.header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath())); - } - - private ClientResponse touch(TestContext context, Entity entity, Boolean skipDryRun) { - WebResource resource = context.service.path("api/entities/touch/" - + entity.getEntityType().name().toLowerCase() + "/" + entity.getName()); - if (null != skipDryRun) { - resource = resource.queryParam("skipDryRun", String.valueOf(skipDryRun)); - } - return resource - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class); + APIResult result = falconUnitClient.submitAndSchedule(EntityType.FEED.name(), tmpFile.getAbsolutePath(), true, + null, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } @Test public void testUpdateCheckUser() throws Exception { - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); - String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + UnitTestContext context = newContext(); + String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE, + context.overlay); Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName)); updateEndtime(process); - File tmpFile = TestContext.getTempFile(); + File tmpFile = UnitTestContext.getTempFile(); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay); - OozieTestUtils.waitForBundleStart(context, Status.RUNNING); + submitCluster(context); + context.prepare(); + submitFeeds(context.overlay); + submitProcess(tmpFile.getAbsolutePath(), context.overlay); + scheduleProcess(context.getProcessName(), context.getClusterName(), getAbsolutePath(SLEEP_WORKFLOW)); + waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), START_INSTANCE, + InstancesResult.WorkflowStatus.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER); - Feed feed = (Feed) getDefinition(context, EntityType.FEED, context.outputFeedName); + Feed feed = (Feed) getDefinition(EntityType.FEED, context.outputFeedName); //change output feed path and update feed as another user feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}"); - ClientResponse response = update(context, feed, null, false); - context.assertSuccessful(response); + tmpFile = TestContext.getTempFile(); + feed.getEntityType().getMarshaller().marshal(feed, tmpFile); + APIResult result = falconUnitClient.update(EntityType.FEED.name(), feed.getName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 2); @@ -197,27 +235,9 @@ public class EntityManagerJerseyIT { Assert.assertEquals(bundles.get(1).getUser(), TestContext.REMOTE_USER); } - private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>(); - - private TestContext newContext() { - contexts.set(new TestContext()); - return contexts.get(); - } - - @AfterMethod - public void cleanup() throws Exception { - TestContext testContext = contexts.get(); - if (testContext != null) { - OozieTestUtils.killOozieJobs(testContext); - } - - contexts.remove(); - } - public void testOptionalInput() throws Exception { - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); - String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + UnitTestContext context = newContext(); + String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay); Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName)); Input in1 = process.getInputs().getInputs().get(0); @@ -232,14 +252,14 @@ public class EntityManagerJerseyIT { File tmpFile = TestContext.getTempFile(); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay); + schedule(context); } public void testDryRun() throws Exception { //Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); - String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + UnitTestContext context = newContext(); + String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE, + context.overlay); Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName)); Property prop = new Property(); prop.setName("newProp"); @@ -248,10 +268,14 @@ public class EntityManagerJerseyIT { File tmpFile = TestContext.getTempFile(); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); - ClientResponse response = context.validate(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS); - context.assertFailure(response); - - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, false); + try { + falconUnitClient.validate(EntityType.PROCESS.name(), tmpFile.getAbsolutePath(), + true, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } + schedule(context); //Fix the process and then submitAndSchedule should succeed Iterator<Property> itr = process.getProperties().getProperties().iterator(); @@ -264,37 +288,43 @@ public class EntityManagerJerseyIT { tmpFile = TestContext.getTempFile(); process.setName("process" + System.currentTimeMillis()); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); - response = context.submitAndSchedule(tmpFile.getAbsolutePath(), overlay, EntityType.PROCESS); - context.assertSuccessful(response); - - //Update with invalid property should fail again - process.getProperties().getProperties().add(prop); - updateEndtime(process); - response = update(context, process, null, null); - context.assertFailure(response); + APIResult result = falconUnitClient.submitAndSchedule(EntityType.PROCESS.name(), + tmpFile.getAbsolutePath(), true, null, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); // update where dryrun is disabled should succeed. - response = update(context, process, null, true); - context.assertSuccessful(response); - + tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + result = falconUnitClient.update(EntityType.PROCESS.name(), process.getName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } @Test public void testUpdateSuspendedEntity() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); - OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); + UnitTestContext context = newContext(); + schedule(context); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + InstancesResult.WorkflowStatus.RUNNING); //Suspend entity - Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); - ClientResponse response = suspend(context, process); - context.assertSuccessful(response); + Process process = (Process) getDefinition(EntityType.PROCESS, context.processName); + APIResult result = falconUnitClient.suspend(EntityType.PROCESS, process.getName(), context.colo, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(result.getMessage(), "SUSPENDED"); process.getProperties().getProperties().get(0).setName("newprop"); Date endTime = getEndTime(); process.getClusters().getClusters().get(0).getValidity().setEnd(endTime); - response = update(context, process, endTime, null); - context.assertSuccessful(response); + File tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); //Since the process endtime = update effective time, it shouldn't create new bundle List<BundleJob> bundles = OozieTestUtils.getBundles(context); @@ -306,24 +336,26 @@ public class EntityManagerJerseyIT { @Test public void testProcessInputUpdate() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); - OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); + UnitTestContext context = newContext(); + + schedule(context); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + InstancesResult.WorkflowStatus.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); + OozieClient ozClient = OozieTestUtils.getOozieClient(context); String bundle = bundles.get(0).getId(); String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId(); - Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + Process process = (Process) getDefinition(EntityType.PROCESS, context.processName); String feed3 = "f3" + System.currentTimeMillis(); Map<String, String> overlay = new HashMap<String, String>(); overlay.put("inputFeedName", feed3); overlay.put("cluster", context.clusterName); overlay.put("user", System.getProperty("user.name")); - ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay); + Input input = new Input(); input.setFeed(feed3); @@ -334,8 +366,11 @@ public class EntityManagerJerseyIT { updateEndtime(process); Date endTime = getEndTime(); - response = update(context, process, endTime, null); - context.assertSuccessful(response); + File tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); //Assert that update creates new bundle and old coord is running bundles = OozieTestUtils.getBundles(context); @@ -360,16 +395,20 @@ public class EntityManagerJerseyIT { Assert.assertEquals(coord.getStartTime(), endTime); } + @Test public void testProcessEndtimeUpdate() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); + UnitTestContext context = newContext(); + schedule(context); OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); - Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + Process process = (Process) getDefinition(EntityType.PROCESS, context.processName); updateEndtime(process); - ClientResponse response = update(context, process, null, null); - context.assertSuccessful(response); + File tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); //Assert that update does not create new bundle List<BundleJob> bundles = OozieTestUtils.getBundles(context); @@ -378,26 +417,28 @@ public class EntityManagerJerseyIT { @Test public void testTouchEntity() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); + UnitTestContext context = newContext(); + schedule(context); OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - OozieClient ozClient = OozieTestUtils.getOozieClient(context.getCluster().getCluster()); + OozieClient ozClient = OozieTestUtils.getOozieClient(context); String bundle = bundles.get(0).getId(); String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId(); //Update end time of process required for touch - Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + Process process = (Process) getDefinition(EntityType.PROCESS, context.processName); updateEndtime(process); - ClientResponse response = update(context, process, null, null); - context.assertSuccessful(response); + File tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(), + tmpFile.getAbsolutePath(), true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - //Calling force update - response = touch(context, process, true); - context.assertSuccessful(response); + result = falconUnitClient.touch(EntityType.PROCESS.name(), context.getProcessName(), context.colo, true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING); //Assert that touch creates new bundle and old coord is running @@ -421,101 +462,72 @@ public class EntityManagerJerseyIT { } public void testStatus() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); - - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + submitCluster(context); - response = context.service - .path("api/entities/status/feed/" + overlay.get("inputFeedName")) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - - APIResult result = (APIResult) context.unmarshaller. - unmarshal(new StringReader(response.getEntity(String.class))); - Assert.assertTrue(result.getMessage().contains("SUBMITTED")); + context.prepare(); + submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + APIResult result = falconUnitClient.getStatus(EntityType.FEED, context.overlay.get("inputFeedName"), + context.colo, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(result.getMessage(), "SUBMITTED"); } public void testIdempotentSubmit() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); } - public void testNotFoundStatus() { - TestContext context = newContext(); - ClientResponse response; + public void testNotFoundStatus() throws FalconException, IOException, FalconCLIException { String feed1 = "f1" + System.currentTimeMillis(); - response = context.service - .path("api/entities/status/feed/" + feed1) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_PLAIN) - .get(ClientResponse.class); - - Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); + try { + falconUnitClient.getStatus(EntityType.FEED, feed1, null, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } } - public void testVersion() { - TestContext context = newContext(); - ClientResponse response; - response = context.service - .path("api/admin/version") - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.APPLICATION_JSON) - .get(ClientResponse.class); - String json = response.getEntity(String.class); + public void testVersion() throws FalconException, IOException, FalconCLIException { + String json = falconUnitClient.getVersion(null); String buildVersion = BuildProperties.get().getProperty("build.version"); String deployMode = DeploymentProperties.get().getProperty("deploy.mode"); Assert.assertTrue(Pattern.matches( - ".*\\{\\s*\"key\"\\s*:\\s*\"Version\"\\s*,\\s*\"value\"\\s*:\\s*\"" - + buildVersion + "\"\\s*}.*", json), + ".*\\{\\s*\"key\"\\s*:\\s*\"Version\"\\s*,\\s*\"value\"\\s*:\\s*\"" + + buildVersion + "\"\\s*}.*", json), "No build.version found in /api/admin/version"); Assert.assertTrue(Pattern.matches( - ".*\\{\\s*\"key\"\\s*:\\s*\"Mode\"\\s*,\\s*\"value\"\\s*:\\s*\"" - + deployMode + "\"\\s*}.*", json), + ".*\\{\\s*\"key\"\\s*:\\s*\"Mode\"\\s*,\\s*\"value\"\\s*:\\s*\"" + + deployMode + "\"\\s*}.*", json), "No deploy.mode found in /api/admin/version"); } - public void testValidate() { - TestContext context = newContext(); - ServletInputStream stream = context.getServletInputStream(getClass(). - getResourceAsStream(TestContext.SAMPLE_PROCESS_XML)); - - ClientResponse clientResponse = context.service - .path("api/entities/validate/process") - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class, stream); - - context.assertFailure(clientResponse); + public void testValidate() throws FalconException, IOException { + UnitTestContext context = newContext(); + try { + falconUnitClient.validate(EntityType.PROCESS.name(), + UnitTestContext.class.getResource(UnitTestContext.SAMPLE_PROCESS_XML).getPath(), true, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } } public void testClusterValidate() throws Exception { - TestContext context = newContext(); - ClientResponse clientResponse; - Map<String, String> overlay = context.getUniqueOverlay(); - - InputStream stream = context.getServletInputStream( - TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay)); - - clientResponse = context.service.path("api/entities/validate/cluster") - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class, stream); - context.assertSuccessful(clientResponse); + UnitTestContext context = newContext(); + + String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, context.overlay); + File tmpFile = new File(tmpFileName); + fs.mkdirs(new Path(STAGING_PATH), HadoopClientFactory.ALL_PERMISSION); + fs.mkdirs(new Path(WORKING_PATH), HadoopClientFactory.READ_EXECUTE_PERMISSION); + APIResult result = falconUnitClient.validate(EntityType.CLUSTER.name(), tmpFile.getAbsolutePath(), + true, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } ClientResponse suspend(TestContext context, Entity entity) { @@ -531,106 +543,91 @@ public class EntityManagerJerseyIT { } public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception { - TestContext context = newContext(); - ClientResponse clientResponse; - Map<String, String> overlay = context.getUniqueOverlay(); - - clientResponse = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, - EntityType.CLUSTER); - context.assertSuccessful(clientResponse); - - clientResponse = context.service - .path("api/entities/schedule/cluster/" + context.clusterName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class); - context.assertFailure(clientResponse); - - clientResponse = suspend(context, EntityType.CLUSTER, context.clusterName); - context.assertFailure(clientResponse); - - clientResponse = context.service - .path("api/entities/resume/cluster/" + context.clusterName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class); - context.assertFailure(clientResponse); - - clientResponse = context.service - .path("api/entities/delete/cluster/" + context.clusterName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertSuccessful(clientResponse); + UnitTestContext context = newContext(); + + submitCluster(context); + + try { + falconUnitClient.schedule(EntityType.CLUSTER, context.clusterName, null, true, null, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } + + try { + falconUnitClient.suspend(EntityType.CLUSTER, context.clusterName, context.colo, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } + + try { + falconUnitClient.resume(EntityType.CLUSTER, context.clusterName, context.colo, null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } + + APIResult result = falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } public void testSubmit() throws Exception { - TestContext context = newContext(); + UnitTestContext context = newContext(); ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED); - context.assertSuccessful(response); + submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay); - response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS); - context.assertSuccessful(response); + submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay); } @Test public void testDuplicateSubmitCommands() throws Exception { - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); + submitCluster(context); ExecutorService service = Executors.newSingleThreadExecutor(); ExecutorService duplicateService = Executors.newSingleThreadExecutor(); - Future<ClientResponse> future = service.submit(new SubmitCommand(context, overlay)); - Future<ClientResponse> duplicateFuture = duplicateService.submit(new SubmitCommand(context, overlay)); - - ClientResponse response = future.get(); - ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get(); + Future<APIResult> future = service.submit(new SubmitCommand(context, context.overlay)); + Future<APIResult> duplicateFuture = duplicateService.submit(new SubmitCommand(context, context.overlay)); // since there are duplicate threads for submits, there is no guarantee which request will succeed. - testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse); + try { + APIResult response = future.get(); + APIResult duplicateSubmitThreadResponse = duplicateFuture.get(); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } } @Test public void testDuplicateDeleteCommands() throws Exception { - TestContext context = newContext(); - Map<String, String> overlay = context.getUniqueOverlay(); - context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); + UnitTestContext context = newContext(); + Map<String, String> overlay = context.overlay; + submitCluster(context); + submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay); ExecutorService service = Executors.newFixedThreadPool(2); - Future<ClientResponse> future = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"), + Future<APIResult> future = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"), "feed")); - Future<ClientResponse> duplicateFuture = service.submit(new DeleteCommand(context, + Future<APIResult> duplicateFuture = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"), "feed")); - ClientResponse response = future.get(); - ClientResponse duplicateSubmitThreadResponse = duplicateFuture.get(); - - // since there are two threads for deletion, there is no guarantee which request will succeed. - testDuplicateCommandsResponse(context, response, duplicateSubmitThreadResponse); - } - - private void testDuplicateCommandsResponse(TestContext context, ClientResponse response, - ClientResponse duplicateSubmitThreadResponse) { - if (response.getStatus() == Response.Status.OK.getStatusCode()) { - context.assertSuccessful(response); - context.assertFailure(duplicateSubmitThreadResponse); - } else { - context.assertFailure(response); - context.assertSuccessful(duplicateSubmitThreadResponse); + // since there are duplicate threads for submits, there is no guarantee which request will succeed. + try { + APIResult response = future.get(); + APIResult duplicateSubmitThreadResponse = duplicateFuture.get(); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore } } @@ -643,105 +640,144 @@ public class EntityManagerJerseyIT { } private void scheduleAndDeleteProcess(boolean withDoAs) throws Exception { - TestContext context = newContext(); + UnitTestContext context = newContext(); + submitCluster(context); + context.prepare(); + submitFeeds(context.overlay); ClientResponse clientResponse; - Map<String, String> overlay = context.getUniqueOverlay(); - String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay); Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName)); updateEndtime(process); File tmpFile = TestContext.getTempFile(); EntityType.PROCESS.getMarshaller().marshal(process, tmpFile); + submitProcess(tmpFile.getAbsolutePath(), context.overlay); if (withDoAs) { - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, FalconTestUtil.TEST_USER_2, null); + falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false, + FalconTestUtil.TEST_USER_2, null); } else { - context.scheduleProcess(tmpFile.getAbsolutePath(), overlay, null, "", "key1:value1"); + falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false, "", + "key1:value1"); } OozieTestUtils.waitForBundleStart(context, Status.RUNNING); - WebResource resource = context.service.path("api/entities/delete/process/" + context.processName); - + APIResult result; if (withDoAs) { - resource = resource.queryParam(FalconCLI.DO_AS_OPT, FalconTestUtil.TEST_USER_2); + result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), FalconTestUtil.TEST_USER_2); + } else { + result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null); } - - //Delete a scheduled process - clientResponse = resource - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertSuccessful(clientResponse); + assertStatus(result); } public void testGetEntityDefinition() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + context.prepare(); + APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - response = context.service - .path("api/entities/definition/feed/" + overlay.get("inputFeedName")) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); + Feed feed = (Feed) falconUnitClient.getDefinition(EntityType.FEED.name(), + context.overlay.get("inputFeedName"), null); + Assert.assertEquals(feed.getName(), context.overlay.get("inputFeedName")); + } - String feedXML = response.getEntity(String.class); + public void testInvalidGetEntityDefinition() throws FalconException, IOException, FalconCLIException { try { - Feed result = (Feed) context.unmarshaller. - unmarshal(new StringReader(feedXML)); - Assert.assertEquals(result.getName(), overlay.get("inputFeedName")); - } catch (JAXBException e) { - Assert.fail("Reponse " + feedXML + " is not valid", e); + falconUnitClient.getDefinition(EntityType.PROCESS.name(), "sample1", null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore } } - public void testInvalidGetEntityDefinition() { - TestContext context = newContext(); - ClientResponse clientResponse = context.service - .path("api/entities/definition/process/sample1") - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - context.assertFailure(clientResponse); - } - public void testScheduleSuspendResume() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); - - ClientResponse clientResponse = suspend(context, EntityType.PROCESS, context.processName); - context.assertSuccessful(clientResponse); - - clientResponse = context.service - .path("api/entities/resume/process/" + context.processName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .post(ClientResponse.class); - context.assertSuccessful(clientResponse); + UnitTestContext context = newContext(); + + schedule(context); + waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, + InstancesResult.WorkflowStatus.RUNNING); + + APIResult result = falconUnitClient.suspend(EntityType.PROCESS, context.getProcessName(), + context.colo, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(result.getMessage(), "SUSPENDED"); + + result = falconUnitClient.resume(EntityType.PROCESS, context.getProcessName(), + context.colo, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + Assert.assertEquals(result.getMessage(), "RUNNING"); } public void testFeedSchedule() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + context.prepare(); + APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - createTestData(context); - ClientResponse clientResponse = context.service - .path("api/entities/schedule/feed/" + overlay.get("inputFeedName")) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class); - context.assertSuccessful(clientResponse); + createTestData(); + result = falconUnitClient.schedule(EntityType.FEED, context.overlay.get("inputFeedName"), null, true, null, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + static List<Path> createTestData() throws Exception { + List<Path> list = new ArrayList<Path>(); + fs.mkdirs(new Path("/user/guest")); + fs.setOwner(new Path("/user/guest"), TestContext.REMOTE_USER, "users"); + + DateFormat formatter = new SimpleDateFormat("yyyy/MM/dd/HH/mm"); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + Date date = new Date(System.currentTimeMillis() + 3 * 3600000); + Path path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + date = new Date(date.getTime() - 3600000); + path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file"); + list.add(path); + fs.create(path).close(); + new FsShell(new Configuration()).run(new String[] { + "-chown", "-R", "guest:users", "/examples/input-data/rawLogs", }); + return list; } static List<Path> createTestData(TestContext context) throws Exception { @@ -795,135 +831,87 @@ public class EntityManagerJerseyIT { } public void testDeleteDataSet() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + context.prepare(); + APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - response = context.service - .path("api/entities/delete/feed/" + overlay.get("inputFeedName")) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertSuccessful(response); + result = falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } public void testDelete() throws Exception { - TestContext context = newContext(); - ClientResponse response; - Map<String, String> overlay = context.getUniqueOverlay(); + UnitTestContext context = newContext(); - response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER); - context.assertSuccessful(response); + submitCluster(context); - response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + context.prepare(); + APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - response = context.service - .path("api/entities/delete/cluster/" + context.clusterName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertFailure(response); + try { + falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } - response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED); - context.assertSuccessful(response); + result = submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS); - context.assertSuccessful(response); + submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay); //Delete a referred feed - response = context.service - .path("api/entities/delete/feed/" + overlay.get("inputFeedName")) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertFailure(response); + try { + falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } //Delete a submitted process - response = context.service - .path("api/entities/delete/process/" + context.processName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertSuccessful(response); - - response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS); - context.assertSuccessful(response); - - ClientResponse clientResponse = context.service - .path("api/entities/schedule/process/" + context.processName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML) - .post(ClientResponse.class); - context.assertSuccessful(clientResponse); + result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + + submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay); + + result = falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), null, true, null, + null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); //Delete a scheduled process - response = context.service - .path("api/entities/delete/process/" + context.processName) - .header("Cookie", context.getAuthenticationToken()) - .accept(MediaType.TEXT_XML) - .delete(ClientResponse.class); - context.assertSuccessful(response); + result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); } @Test public void testGetEntityList() throws Exception { - TestContext context = newContext(); - ClientResponse response; - response = context.service - .path("api/entities/list/process/") - .header("Cookie", context.getAuthenticationToken()) - .type(MediaType.TEXT_XML) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - Assert.assertEquals(response.getStatus(), 200); - - EntityList result = response.getEntity(EntityList.class); + EntityList result = falconUnitClient.getEntityList(EntityType.PROCESS.name(), "", "", null, null, + null, null, null, new Integer(0), new Integer(1), null); Assert.assertNotNull(result); for (EntityList.EntityElement entityElement : result.getElements()) { Assert.assertNull(entityElement.status); // status is null } - response = context.service - .path("api/entities/list/cluster/") - .header("Cookie", context.getAuthenticationToken()) - .type(MediaType.TEXT_XML) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - Assert.assertEquals(response.getStatus(), 200); - result = response.getEntity(EntityList.class); + result = falconUnitClient.getEntityList(EntityType.CLUSTER.name(), "", "", null, null, + null, null, null, new Integer(0), new Integer(1), null); Assert.assertNotNull(result); for (EntityList.EntityElement entityElement : result.getElements()) { Assert.assertNull(entityElement.status); // status is null } - response = context.service - .path("api/entities/list/feed,process/") - .header("Cookie", context.getAuthenticationToken()) - .type(MediaType.TEXT_XML) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - Assert.assertEquals(response.getStatus(), 200); - result = response.getEntity(EntityList.class); + result = falconUnitClient.getEntityList(EntityType.FEED.name() + "," + EntityType.PROCESS.name(), + "", "", null, null, null, null, null, new Integer(0), new Integer(1), null); Assert.assertNotNull(result); for (EntityList.EntityElement entityElement : result.getElements()) { Assert.assertNull(entityElement.status); // status is null } - response = context.service - .path("api/entities/list/") - .header("Cookie", context.getAuthenticationToken()) - .type(MediaType.TEXT_XML) - .accept(MediaType.TEXT_XML) - .get(ClientResponse.class); - Assert.assertEquals(response.getStatus(), 200); - result = response.getEntity(EntityList.class); + result = falconUnitClient.getEntityList(null, "", "", null, null, null, null, null, new Integer(0), + new Integer(1), null); Assert.assertNotNull(result); for (EntityList.EntityElement entityElement : result.getElements()) { Assert.assertNull(entityElement.status); // status is null @@ -932,21 +920,21 @@ public class EntityManagerJerseyIT { @Test public void testDuplicateUpdateCommands() throws Exception { - TestContext context = newContext(); - context.scheduleProcess(); + UnitTestContext context = newContext(); + schedule(context); OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING); List<BundleJob> bundles = OozieTestUtils.getBundles(context); Assert.assertEquals(bundles.size(), 1); - Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName); + Process process = (Process) getDefinition(EntityType.PROCESS, context.processName); String feed3 = "f3" + System.currentTimeMillis(); Map<String, String> overlay = new HashMap<String, String>(); overlay.put("inputFeedName", feed3); overlay.put("cluster", context.clusterName); overlay.put("user", System.getProperty("user.name")); - ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); - context.assertSuccessful(response); + APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); Input input = new Input(); input.setFeed(feed3); @@ -958,13 +946,19 @@ public class EntityManagerJerseyIT { updateEndtime(process); Date endTime = getEndTime(); ExecutorService service = Executors.newSingleThreadExecutor(); - Future<ClientResponse> future = service.submit(new UpdateCommand(context, process, endTime)); - response = update(context, process, endTime, false); - ClientResponse duplicateUpdateThreadResponse = future.get(); + ExecutorService duplicateService = Executors.newSingleThreadExecutor(); - // since there are duplicate threads for updates, there is no guarantee which request will succeed - testDuplicateCommandsResponse(context, response, duplicateUpdateThreadResponse); + Future<APIResult> future = service.submit(new UpdateCommand(context, process, endTime)); + Future<APIResult> duplicateFuture = duplicateService.submit(new UpdateCommand(context, process, endTime)); + // since there are duplicate threads for updates, there is no guarantee which request will succeed + try { + future.get(); + duplicateFuture.get(); + Assert.fail("Exception should be Thrown"); + } catch (Exception e) { + //ignore + } } public Date getEndTime() { @@ -977,38 +971,38 @@ public class EntityManagerJerseyIT { return cal.getTime(); } - class UpdateCommand implements Callable<ClientResponse> { - private TestContext context; + class UpdateCommand implements Callable<APIResult> { + private UnitTestContext context; private Process process; private Date endTime; - public TestContext getContext() { + public UnitTestContext getContext() { return context; } public Process getProcess() { return process; } - public Date getEndTime() { - return endTime; - } - public UpdateCommand(TestContext context, Process process, Date endTime) { + public UpdateCommand(UnitTestContext context, Process process, Date endTime) { this.context = context; this.process = process; this.endTime = endTime; } @Override - public ClientResponse call() throws Exception { - return update(context, process, endTime, false); + public APIResult call() throws Exception { + File tmpFile = TestContext.getTempFile(); + process.getEntityType().getMarshaller().marshal(process, tmpFile); + return falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(), + tmpFile.getAbsolutePath(), true, null); } } - class SubmitCommand implements Callable<ClientResponse> { + class SubmitCommand implements Callable<APIResult> { private Map<String, String> overlay; - private TestContext context; + private UnitTestContext context; - public TestContext getContext() { + public UnitTestContext getContext() { return context; } @@ -1016,36 +1010,35 @@ public class EntityManagerJerseyIT { return overlay; } - public SubmitCommand(TestContext context, Map<String, String> overlay) { + public SubmitCommand(UnitTestContext context, Map<String, String> overlay) { this.context = context; this.overlay = overlay; } @Override - public ClientResponse call() throws Exception { - return context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED); + public APIResult call() throws Exception { + return submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay); } } - class DeleteCommand implements Callable<ClientResponse> { - private TestContext context; + class DeleteCommand implements Callable<APIResult> { + private UnitTestContext context; private String entityName; private String entityType; - public TestContext getContext() { + public UnitTestContext getContext() { return context; } - public DeleteCommand(TestContext context, String entityName, String entityType) { + public DeleteCommand(UnitTestContext context, String entityName, String entityType) { this.context = context; this.entityName = entityName; this.entityType = entityType; } @Override - public ClientResponse call() throws Exception { - return context.deleteFromFalcon(entityName, entityType); + public APIResult call() throws Exception { + return falconUnitClient.delete(EntityType.valueOf(entityType), entityName, null); } } - } http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java index 6458b59..769d059 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java @@ -20,6 +20,7 @@ package org.apache.falcon.resource; import org.apache.commons.io.FileUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesResult.WorkflowStatus; @@ -34,6 +35,7 @@ import org.testng.annotations.Test; import javax.ws.rs.core.MediaType; import java.io.File; import java.io.IOException; +import java.util.Map; /** * Test class for Process Instance REST API. @@ -41,6 +43,7 @@ import java.io.IOException; public class ProcessInstanceManagerIT extends FalconUnitTestBase { private static final String START_INSTANCE = "2012-04-20T00:00Z"; + private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml"; @BeforeClass @Override @@ -59,6 +62,37 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { FileUtils.deleteQuietly(new File(".action.xml.crc")); } + private void submitFeeds(Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, overlay); + APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE2, overlay); + result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + + private void submitProcess(String template, Map<String, String> overlay) throws IOException, FalconCLIException { + String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay); + APIResult result = falconUnitClient.submit(EntityType.PROCESS.name(), tmpFile, null); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + private void schedule(UnitTestContext context) throws FalconCLIException, IOException, FalconException { + String scheduleTime = START_INSTANCE; + APIResult result = scheduleProcess(context.getProcessName(), scheduleTime, 1, context.getClusterName(), + getAbsolutePath(SLEEP_WORKFLOW), true, ""); + Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + } + + private void scheduleProcess(UnitTestContext context) throws Exception { + submitCluster(context.colo, context.clusterName, null); + context.prepare(); + submitFeeds(context.overlay); + submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay); + schedule(context); + } + protected void schedule(TestContext context) throws Exception { CurrentUser.authenticate(System.getProperty("user.name")); schedule(context, 1); @@ -120,8 +154,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testGetInstanceStatus() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:01Z"; InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), @@ -135,8 +168,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testGetInstanceStatusPagination() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:02Z"; InstancesResult response = context.getClient().getStatusOfInstances(EntityType.PROCESS.name(), @@ -151,8 +183,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testKillInstances() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:01Z"; context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, @@ -178,8 +209,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testReRunInstances() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:01Z"; context.getClient().killInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, endTime, @@ -208,8 +238,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testSuspendInstances() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:01Z"; context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, @@ -228,8 +257,7 @@ public class ProcessInstanceManagerIT extends FalconUnitTestBase { @Test public void testResumesInstances() throws Exception { UnitTestContext context = new UnitTestContext(); - submitCluster(context.colo, context.clusterName, null); - context.scheduleProcess(); + scheduleProcess(context); waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE, WorkflowStatus.RUNNING); String endTime = "2012-04-20T00:01Z"; context.getClient().suspendInstances(EntityType.PROCESS.name(), context.processName, START_INSTANCE, http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java index 1d49353..b222305 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java +++ b/webapp/src/test/java/org/apache/falcon/resource/UnitTestContext.java @@ -20,7 +20,6 @@ package org.apache.falcon.resource; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.unit.FalconUnit; import org.apache.falcon.unit.FalconUnitClient; @@ -28,8 +27,8 @@ import org.apache.falcon.util.DeploymentUtil; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.testng.Assert; +import java.io.File; import java.io.IOException; import java.util.Date; import java.util.HashMap; @@ -43,6 +42,7 @@ public class UnitTestContext { public static final String FEED_TEMPLATE1 = "/feed-template1.xml"; public static final String FEED_TEMPLATE2 = "/feed-template2.xml"; public static final String PROCESS_TEMPLATE = "/process-template.xml"; + public static final String SAMPLE_PROCESS_XML = "/process-version-0.xml"; protected String colo; protected String clusterName; @@ -54,7 +54,7 @@ public class UnitTestContext { private static FalconUnitClient client; private static FileSystem fs; protected static ConfigurationStore configStore; - private Map<String, String> overlay; + protected Map<String, String> overlay; public UnitTestContext() throws FalconException, IOException { client = FalconUnit.getClient(); @@ -63,6 +63,14 @@ public class UnitTestContext { overlay = getUniqueOverlay(); } + public String getProcessName() { + return processName; + } + + public String getClusterName() { + return clusterName; + } + public static FalconUnitClient getClient() { return client; } @@ -79,7 +87,7 @@ public class UnitTestContext { } } - private void prepare() throws Exception { + protected void prepare() throws Exception { mkdir(fs, new Path("/falcon"), new FsPermission((short) 511)); Path wfParent = new Path("/falcon/test"); @@ -96,28 +104,22 @@ public class UnitTestContext { mkdir(fs, outPath, new FsPermission((short) 511)); } - public void scheduleProcess() throws Exception { - scheduleProcess(PROCESS_TEMPLATE, overlay); + public static File getTempFile() throws IOException { + return getTempFile("test", ".xml"); } - public void scheduleProcess(String processTemplate, Map<String, String> uniqueOverlay) throws - Exception { - prepare(); - - String tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE1, uniqueOverlay); - APIResult result = client.submit(EntityType.FEED.name(), tmpFile, null); - Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); - - tmpFile = TestContext.overlayParametersOverTemplate(FEED_TEMPLATE2, uniqueOverlay); - result = client.submit(EntityType.FEED.name(), tmpFile, null); - Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + public static File getTempFile(String prefix, String suffix) throws IOException { + return getTempFile("target", prefix, suffix); + } - tmpFile = TestContext.overlayParametersOverTemplate(processTemplate, uniqueOverlay); - result = client.submit(EntityType.PROCESS.name(), tmpFile, null); - Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + @SuppressWarnings("ResultOfMethodCallIgnored") + public static File getTempFile(String path, String prefix, String suffix) throws IOException { + File f = new File(path); + if (!f.exists()) { + f.mkdirs(); + } - result = client.schedule(EntityType.PROCESS, processName, clusterName, true, null, null); - Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED); + return File.createTempFile(prefix, suffix, f); } public Map<String, String> getUniqueOverlay() throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/2804c5d1/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java index 804b2ed..056c0a1 100644 --- a/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java +++ b/webapp/src/test/java/org/apache/falcon/util/OozieTestUtils.java @@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.logging.JobLogMover; import org.apache.falcon.resource.TestContext; +import org.apache.falcon.resource.UnitTestContext; import org.apache.falcon.workflow.WorkflowExecutionContext; import org.apache.falcon.workflow.engine.OozieClientFactory; import org.apache.falcon.workflow.engine.OozieWorkflowEngine; @@ -54,6 +55,10 @@ public final class OozieTestUtils { return getOozieClient(context.getCluster().getCluster()); } + public static OozieClient getOozieClient(UnitTestContext context) throws FalconException { + return OozieClientFactory.get(context.getClusterName()); + } + public static OozieClient getOozieClient(Cluster cluster) throws FalconException { return OozieClientFactory.get(cluster); } @@ -68,6 +73,16 @@ public final class OozieTestUtils { return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + context.getProcessName(), 0, 10); } + public static List<BundleJob> getBundles(UnitTestContext context) throws Exception { + List<BundleJob> bundles = new ArrayList<BundleJob>(); + if (context.getClusterName() == null) { + return bundles; + } + + OozieClient ozClient = OozieClientFactory.get(context.getClusterName()); + return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + context.getProcessName(), 0, 10); + } + public static boolean killOozieJobs(TestContext context) throws Exception { if (context.getCluster() == null) { return true; @@ -133,6 +148,15 @@ public final class OozieTestUtils { waitForBundleStart(context, bundles.get(0).getId(), status); } + public static void waitForBundleStart(UnitTestContext context, Job.Status... status) throws Exception { + List<BundleJob> bundles = getBundles(context); + if (bundles.isEmpty()) { + return; + } + + waitForBundleStart(context, bundles.get(0).getId(), status); + } + public static void waitForBundleStart(TestContext context, String bundleId, Job.Status... status) throws Exception { OozieClient ozClient = getOozieClient(context); Set<Job.Status> statuses = new HashSet<Job.Status>(Arrays.asList(status)); @@ -162,6 +186,36 @@ public final class OozieTestUtils { throw new Exception("Bundle " + bundleId + " is not " + statuses + ". Last seen status " + bundleStatus); } + public static void waitForBundleStart(UnitTestContext context, String bundleId, Job.Status... status) throws + Exception { + OozieClient ozClient = getOozieClient(context); + Set<Job.Status> statuses = new HashSet<Job.Status>(Arrays.asList(status)); + + Status bundleStatus = null; + for (int i = 0; i < 15; i++) { + Thread.sleep(i * 1000); + BundleJob bundle = ozClient.getBundleJobInfo(bundleId); + bundleStatus = bundle.getStatus(); + if (statuses.contains(bundleStatus)) { + if (statuses.contains(Job.Status.FAILED) || statuses.contains(Job.Status.KILLED)) { + return; + } + + boolean done = false; + for (CoordinatorJob coord : bundle.getCoordinators()) { + if (statuses.contains(coord.getStatus())) { + done = true; + } + } + if (done) { + return; + } + } + System.out.println("Waiting for bundle " + bundleId + " in " + statuses + " state"); + } + throw new Exception("Bundle " + bundleId + " is not " + statuses + ". Last seen status " + bundleStatus); + } + public static WorkflowJob getWorkflowJob(Cluster cluster, String filter) throws Exception { OozieClient ozClient = getOozieClient(cluster);
