Repository: falcon Updated Branches: refs/heads/master 207443e9d -> 28fd15c49
FALCON-1215 Adding new test cases related to rerun feature. Contributed by Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/041de1de Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/041de1de Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/041de1de Branch: refs/heads/master Commit: 041de1def1e88430534dd9ced63f983bf711b351 Parents: 207443e Author: samarthg <[email protected]> Authored: Mon Jun 8 11:49:31 2015 +0530 Committer: samarthg <[email protected]> Committed: Mon Jun 8 11:49:31 2015 +0530 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 3 + .../falcon/regression/core/util/OSUtil.java | 4 + .../falcon/regression/core/util/OozieUtil.java | 47 +++++++- .../regression/ProcessInstanceRerunTest.java | 112 +++++++++++++++++-- .../MultipleActionWorkflow/workflow.xml | 75 +++++++++++++ 5 files changed, 230 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index fa0ac0c..189277b 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -5,6 +5,9 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + + FALCON-1215 Adding new test cases related to rerun feature (Pragya M via Samarth Gupta) + FALCON-1249 Tests for process setup wizard (Namit Maheshwari and Paul Isaychuk) FALCON-1242 Search UI test for entity upload button (Namit Maheshwari) http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java index ab27ccf..d1f1c24 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OSUtil.java @@ -47,6 +47,10 @@ public final class OSUtil { public static final String OOZIE_LIB_FOLDER = String.format(RESOURCES + "oozieLib%s", SEPARATOR); + public static final String MULTIPLE_ACTION_WORKFLOW = + String.format(RESOURCES + "MultipleActionWorkflow%s", SEPARATOR); + public static final String PIG_DIR = + String.format(RESOURCES + "pig%s", SEPARATOR); public static String getPath(String... pathParts) { http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java index 15b0497..ef7d887 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java @@ -23,11 +23,12 @@ import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.helpers.entity.AbstractEntityHelper; import org.apache.oozie.client.AuthOozieClient; import org.apache.oozie.client.BundleJob; -import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.Job; +import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.WorkflowAction; import org.joda.time.DateTime; import org.apache.log4j.Logger; import org.joda.time.DateTimeZone; @@ -49,6 +50,8 @@ import java.util.TreeMap; * helper methods for oozie . */ public final class OozieUtil { + + public static final String FAIL_MSG = "NO_such_workflow_exists"; private OozieUtil() { throw new AssertionError("Instantiating utility class..."); } @@ -705,4 +708,46 @@ public final class OozieUtil { return OSUtil.IS_WINDOWS ? 60 : 30; } } + + public static String getActionStatus(OozieClient oozieClient, String workflowId, String actionName) + throws OozieClientException { + List<WorkflowAction> wfAction = oozieClient.getJobInfo(workflowId).getActions(); + for (WorkflowAction wf : wfAction) { + if (wf.getName().contains(actionName)) { + return wf.getExternalStatus(); + } + } + return ""; + } + + public static String getWorkflowActionStatus(OozieClient oozieClient, String bundleId, String actionName) + throws OozieClientException { + List<String> workflowIds = getWorkflowJobs(oozieClient, bundleId); + if (workflowIds.get(0).isEmpty()) { + return FAIL_MSG; + } + return getActionStatus(oozieClient, workflowIds.get(0), actionName); + } + + public static String getSubWorkflowActionStatus(OozieClient oozieClient, String bundleId, + String actionName, String subAction) + throws OozieClientException { + List<String> workflowIds = getWorkflowJobs(oozieClient, bundleId); + if (workflowIds.get(0).isEmpty()) { + return FAIL_MSG; + } + + String wid=""; + List<WorkflowAction> wfAction = oozieClient.getJobInfo(workflowIds.get(0)).getActions(); + for (WorkflowAction wf : wfAction) { + if (wf.getName().contains(actionName)) { + wid = wf.getExternalId(); + } + } + + if (!wid.isEmpty()) { + return getActionStatus(oozieClient, wid, subAction); + } + return FAIL_MSG; + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java index f65f9c9..ef3d8a7 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.regression; +import org.apache.falcon.entity.v0.process.*; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency.TimeUnit; @@ -51,6 +52,7 @@ import java.util.List; @Test(groups = "embedded") public class ProcessInstanceRerunTest extends BaseTestClass { + private boolean restartRequired; private String baseTestDir = cleanAndGetTestDir(); private String aggregateWorkflowDir = baseTestDir + "/aggregator"; private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; @@ -111,7 +113,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); + start + "&end=2010-01-02T01:11Z"); InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0); } @@ -203,7 +205,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); InstancesResult r = prism.getProcessHelper() .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z"); InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); @@ -234,7 +236,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); + start + "&end=2010-01-02T01:11Z"); TimeUtil.sleepSeconds(TIMEOUT); InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0); } @@ -253,7 +255,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); + CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); prism.getProcessHelper().getProcessInstanceKill(processName, start + "&end=2010-01-02T01:01Z"); String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0); @@ -282,7 +284,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING, Status.SUCCEEDED).get(0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction - .Status.SUCCEEDED, EntityType.PROCESS); + .Status.SUCCEEDED, EntityType.PROCESS); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:01Z&force=true"); Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID)); @@ -305,9 +307,9 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); prism.getProcessHelper().getProcessInstanceSuspend(processName, - start + "&end=2010-01-02T01:06Z"); + start + "&end=2010-01-02T01:06Z"); prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:06Z"); + start + "&end=2010-01-02T01:06Z"); Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1), CoordinatorAction.Status.SUSPENDED); } @@ -326,7 +328,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z&force=true"); @@ -351,9 +353,99 @@ public class ProcessInstanceRerunTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS); prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); + start + "&end=2010-01-02T01:11Z"); s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0); Assert.assertEquals(s, CoordinatorAction.Status.WAITING, - "instance should have been in WAITING state"); + "instance should have been in WAITING state"); + } + + @Test(groups = {"singleCluster"}, timeOut = 1200000) + public void testProcessInstanceRerunFailedPostProcessing() throws Exception { + restartRequired=true; + bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z"); + bundles[0].setOutputFeedLocationData(feedOutputPath); + bundles[0].setProcessConcurrency(1); + bundles[0].submitFeedsScheduleProcess(prism); + + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + + //bring down Server1 colo + Util.shutDownService(cluster.getClusterHelper()); + + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + + //wait for instance to go in killing state + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, + CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5); + + Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing") + .contains("KILLED"), true); + Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action") + .contains("SUCCEEDED"), true); + + //start Server1 colo + Util.startService(cluster.getClusterHelper()); + TimeUtil.sleepSeconds(10); + + prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z"); + + while (!OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing").contains("SUCCEEDED")) { + TimeUtil.sleepSeconds(10); + } + } + + @Test(groups = {"singleCluster"}, timeOut = 1200000) + public void testProcessInstanceRerunFailedWorkflowAction() throws Exception { + + // Defining path to be used in pig script + String propPath = cleanAndGetTestDir() + "/rerun"; + org.apache.falcon.entity.v0.process.Process processElement = bundles[0].getProcessObject(); + Properties properties = new Properties(); + Property propertyInput = new Property(); + propertyInput.setName("inputPath"); + propertyInput.setValue(propPath); + + Property propertyOutput = new Property(); + propertyOutput.setName("outputPath"); + propertyOutput.setValue(propPath + "/output"); + properties.getProperties().add(propertyInput); + properties.getProperties().add(propertyOutput); + processElement.setProperties(properties); + bundles[0].setProcessData(processElement.toString()); + + HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.MULTIPLE_ACTION_WORKFLOW); + HadoopUtil.copyDataToFolder(clusterFS, aggregateWorkflowDir, OSUtil.PIG_DIR + "id.pig"); + + bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z"); + bundles[0].setOutputFeedLocationData(feedOutputPath); + bundles[0].setProcessConcurrency(1); + bundles[0].submitFeedsScheduleProcess(prism); + + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); + + //wait for instance to get killed + InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, + CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5); + + Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action") + .contains("KILLED"), true); + Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "pig") + .contains("KILLED"), true); + Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "aggregator") + .contains("SUCCEEDED"), true); + + HadoopUtil.uploadDir(clusterFS, propPath, OSUtil.MULTIPLE_ACTION_WORKFLOW); + + prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z"); + + while (!OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "pig").contains("SUCCEEDED")) { + TimeUtil.sleepSeconds(10); + } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/041de1de/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml b/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml new file mode 100644 index 0000000..8733f90 --- /dev/null +++ b/falcon-regression/merlin/src/test/resources/MultipleActionWorkflow/workflow.xml @@ -0,0 +1,75 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<workflow-app xmlns="uri:oozie:workflow:0.2" name="aggregator-wf"> + <start to="aggregator"/> + <action name="aggregator"> + <map-reduce> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <prepare> + <delete path="${outputData}"/> + </prepare> + <configuration> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>mapred.mapper.class</name> + <value>org.apache.hadoop.mapred.lib.IdentityMapper</value> + </property> + <property> + <name>mapred.reducer.class</name> + <value>org.apache.hadoop.mapred.lib.IdentityReducer</value> + </property> + <property> + <name>mapred.map.tasks</name> + <value>1</value> + </property> + <property> + <name>mapred.input.dir</name> + <value>${inputData}</value> + </property> + <property> + <name>mapred.output.dir</name> + <value>${outputData}</value> + </property> + </configuration> + </map-reduce> + <ok to="pigAction"/> + <error to="failMapRed"/> + </action> + + <action name="pigAction"> + <pig> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <script>id.pig</script> + <param>INPUT=${inputPath}</param> + <param>OUTPUT=${outputPath}</param> + </pig> + <ok to="end"/> + <error to="failPig"/> + </action> + + <kill name="failMapRed"> + <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <kill name="failPig"> + <message>Pig action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> + </kill> + <end name="end"/> +</workflow-app>
