Repository: falcon Updated Branches: refs/heads/master 594566c01 -> f63a47f51
FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in Falcon. Contributed by Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f63a47f5 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f63a47f5 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f63a47f5 Branch: refs/heads/master Commit: f63a47f51f5fb6b4ea1e9d2b1291733c18e47ab0 Parents: 594566c Author: samarthg <[email protected]> Authored: Thu Feb 26 11:28:12 2015 +0530 Committer: samarthg <[email protected]> Committed: Thu Feb 26 11:28:12 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 3 + .../regression/core/util/InstanceUtil.java | 20 +++++- .../falcon/regression/core/util/OozieUtil.java | 18 +++++ .../regression/ProcessInstanceStatusTest.java | 69 ++++++++++++++++++-- 4 files changed, 104 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index ababa41..6e6ac7f 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -51,6 +51,9 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + + FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in + Falcon. (Pragya M via Samarth G) FALCON-1044 Add tests for the change that start and end are compulsory parameters for all instance POST apis.(Karishma G via Samarth) http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 3524355..5d4e657 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -202,7 +202,7 @@ public final class InstanceUtil { List<InstancesResult.WorkflowStatus> statuses = new ArrayList<InstancesResult.WorkflowStatus>(); for (InstancesResult.Instance instance : instances) { final InstancesResult.WorkflowStatus status = instance.getStatus(); - LOGGER.info("status: "+ status + ", instance: " + instance.getInstance()); + LOGGER.info("status: " + status + ", instance: " + instance.getInstance()); statuses.add(status); } @@ -216,6 +216,24 @@ public final class InstanceUtil { killedCount, "Killed Instances"); } + public static List<String> getWorkflowJobIds(InstancesResult instancesResult) { + InstancesResult.Instance[] instances = instancesResult.getInstances(); + LOGGER.info("instances: " + Arrays.toString(instances)); + Assert.assertNotNull(instances, "instances should be not null"); + List<String> wfids = new ArrayList<String>(); + for (InstancesResult.Instance instance : instances) { + LOGGER.warn("instance: " + instance + " , status: " + + instance.getStatus() + ", logs : " + instance.getLogFile()); + if (instance.getStatus().name().equals("RUNNING") || instance.getStatus().name().equals("SUCCEEDED")) { + wfids.add(instance.getLogFile()); + } + if (instance.getStatus().name().equals("KILLED") || instance.getStatus().name().equals("WAITING")) { + Assert.assertNull(instance.getLogFile()); + } + } + return wfids; + } + /** * Checks that expected number of failed instances matches actual number of failed ones. * http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index d74864f..3cc171f 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -306,6 +306,24 @@ public final class OozieUtil { return workflowIds; } + public static List<String> getWorkflow(ColoHelper coloHelper, String bundleID) + throws OozieClientException { + OozieClient oozieClient = coloHelper.getClusterHelper().getOozieClient(); + waitForCoordinatorJobCreation(oozieClient, bundleID); + List<String> workflowIds = new ArrayList<String>(); + String coordId = InstanceUtil.getDefaultCoordIDFromBundle(oozieClient, bundleID); + CoordinatorJob coordJobInfo = oozieClient.getCoordJobInfo(coordId); + for (CoordinatorAction action : coordJobInfo.getActions()) { + if (action.getStatus().name().equals("RUNNING") || action.getStatus().name().equals("SUCCEEDED")) { + workflowIds.add(action.getExternalId()); + } + if (action.getStatus().name().equals("KILLED") || action.getStatus().name().equals("WAITING")) { + Assert.assertNull(action.getExternalId()); + } + } + return workflowIds; + } + public static Date getNominalTime(ColoHelper prismHelper, String bundleID) throws OozieClientException { OozieClient oozieClient = prismHelper.getClusterHelper().getOozieClient(); http://git-wip-us.apache.org/repos/asf/falcon/blob/f63a47f5/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java index e1e7dc1..b6440b8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java @@ -52,6 +52,8 @@ import org.testng.annotations.Test; import javax.xml.bind.JAXBException; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; /** * Process instance status tests. @@ -107,7 +109,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { /** * time out is set as 3 minutes .... getStatus is for a large range in past. * 6 instance should be materialized and one in running and other in waiting - * + * Adding logging information test as part of FALCON-813. + * In case status does not contain jobId of instance the test should fail. * @throws Exception */ @Test(groups = {"singleCluster"}) @@ -118,12 +121,16 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** @@ -157,13 +164,15 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].submitFeedsScheduleProcess(prism); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z"); + "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z"); InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0); } /** * Schedule process and try to -getStatus without date parameters. Attempt should succeed. Start defaults * to start of entity and end defaults to end of entity. + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. */ @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusDateEmpty() @@ -172,35 +181,47 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** * Schedule process with number of instances. Perform -getStatus request with valid * parameters. Attempt should succeed. - * + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. + * * @throws Exception */ @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusStartAndEnd() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 , CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** * Schedule process. Perform -getStatus using -start parameter which is out of process * validity range. Attempt should succeed, with start defaulted to entity start time. + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. * * @throws Exception */ @@ -209,13 +230,17 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** @@ -267,6 +292,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { /** * Schedule process. Perform -getStatus using -start/-end parameters which are out of process * validity range. Attempt should succeed, with start/end defaulted to entity's start/end. + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. * * @throws Exception */ @@ -276,13 +303,17 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(2); bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z"); InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** @@ -316,19 +347,24 @@ public class ProcessInstanceStatusTest extends BaseTestClass { * Schedule process. -getStatus of it's first instance using only -start parameter which * points to start time of process validity. Check that response reflects expected status of * instance. - * + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. * @throws Exception */ @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusOnlyStart() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z"); InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** @@ -348,7 +384,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass { /** * Schedule process. Try to -getStatus without time range parameters. Attempt succeeds. - * + * Adding logging information test as part of status information. + * In case status does not contain jobId of instance the test should fail. * * @throws Exception */ @@ -357,12 +394,16 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessConcurrency(5); bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].submitFeedsScheduleProcess(prism); + String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0); + List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); + Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); } /** @@ -386,4 +427,22 @@ public class ProcessInstanceStatusTest extends BaseTestClass { "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z"); InstanceUtil.validateFailedInstances(r, 3); } + + /* + * Function to match the workflows obtained from instance status and oozie. + */ + private boolean matchWorkflows(List<String> instanceWf, List<String> oozieWf) { + Collections.sort(instanceWf); + Collections.sort(oozieWf); + if (instanceWf.size() != oozieWf.size()) { + return false; + } + for (int index = 0; index < instanceWf.size(); index++) { + if (!instanceWf.get(index).contains(oozieWf.get(index))) { + return false; + } + } + return true; + } + }
