http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java deleted file mode 100644 index 48c8021..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.WorkflowStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.Arrays; -import java.util.List; - -/** - * Process instance kill tests. - */ -@Test(groups = "embedded") -public class ProcessInstanceKillsTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private FileSystem clusterFS = serverFS.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class); - private String processName; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - processName = bundles[0].getProcessName(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Schedule process. Perform -kill action for only one instance. Check that action - * succeeded and only one instance was killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillSingle() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED); - } - - /** - * Schedule process. Check that in case when -start and -end parameters are equal zero - * instances are killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillStartAndEndSame() throws Exception { - bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z"); - bundles[0].setProcessConcurrency(2); - bundles[0].setProcessTimeOut(3, TimeUnit.minutes); - bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); - bundles[0].setProcessConcurrency(10); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceKill(processName, "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z"); - Assert.assertNull(r.getInstances(), "There should be zero instances killed"); - } - - /** - * Schedule process. Provide data for all instances except the last, - * thus making it non-materialized (waiting). Try to -kill last 3 instances. - * Check that only running instances were affected. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillKillNotRunning() throws Exception { - bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T00:26Z"); - bundles[0].setProcessTimeOut(3, TimeUnit.minutes); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - - //create data for first 5 instances, 6th should be non-materialized - String bundleId = OozieUtil.getSequenceBundleID(clusterOC, processName, - EntityType.PROCESS, 0); - for(CoordinatorJob c : clusterOC.getBundleJobInfo(bundleId).getCoordinators()) { - List<CoordinatorAction> actions = clusterOC.getCoordJobInfo(c.getId()).getActions(); - if (actions.size() == 6) { - for(int i = 0; i < 5; i++) { - CoordinatorAction action = actions.get(i); - HadoopUtil.createHDFSFolders(cluster, Arrays - .asList(action.getMissingDependencies().split("#"))); - } - break; - } - } - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceKill(processName, "?start=2010-01-02T00:14Z&end=2010-01-02T00:26Z"); - InstanceUtil.validateResponse(r, 3, 0, 0, 1, 2); - LOGGER.info(r.toString()); - } - - /** - * Generate data. Schedule process. Try to perform -kill - * operation using -start and -end which are both in future with respect to process start. - * - * @throws Exception TODO amend test with validations - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillBothStartAndEndInFuture01() throws Exception { - /* - both start and end r in future with respect to process start end - */ - String startTime = TimeUtil.getTimeWrtSystemTime(-20); - String endTime = TimeUtil.getTimeWrtSystemTime(400); - String startTimeData = TimeUtil.getTimeWrtSystemTime(-50); - String endTimeData = TimeUtil.getTimeWrtSystemTime(50); - - bundles[0].setInputFeedDataPath(feedInputPath.replace("input/", "input01/")); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17); - String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=" + startTimeRequest + "&end=" + endTimeRequest); - LOGGER.info(r.toString()); - } - - /** - * Schedule process. Check that -kill action is not performed when time range between -start - * and -end parameters is in future and don't include existing instances. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillBothStartAndEndInFuture() throws Exception { - /* - both start and end r in future with respect to current time - */ - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z"); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - String startTime = TimeUtil.getTimeWrtSystemTime(1); - String endTime = TimeUtil.getTimeWrtSystemTime(40); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceKill(processName, "?start=" + startTime + "&end=" + endTime); - LOGGER.info(r.getMessage()); - Assert.assertEquals(r.getInstances(), null); - } - - /** - * Schedule process. Perform -kill action within time range which includes 3 running instances. - * Get status of instances within wider range. Check that only mentioned 3 instances are - * killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillMultipleInstance() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper() - .getProcessInstanceKill(processName, "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(r, 5, 2, 0, 0, 3); - } - - /** - * Schedule process. Perform -kill action on last expected instance. Get status of instances - * which are in wider range. Check that only last is killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillLastInstance() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 10); - prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=2010-01-02T01:20Z&end=2010-01-02T01:21Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(r, 5, 4, 0, 0, 1); - } - - /** - * Schedule process. Suspend one running instance. Perform -kill action on it. Check that - * mentioned instance is really killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillSuspended() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:04Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:04Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED); - } - - /** - * Schedule single instance process. Wait till it finished. Try to kill the instance. Check - * that instance still succeeded. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillSucceeded() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:04Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED); - } - - /** - * Schedule process. Perform -kill action using only -start parameter. Check that action - * succeeded and only one instance was killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillWOEndParam() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?start=2010-01-02T01:00Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Perform -kill action using only -end parameter. Check that action - * succeeded and only one instance was killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillWOStartParam() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - "?end=2010-01-02T01:01Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Perform -kill action without start or end params. Check that action - * succeeded and only one instance was killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceKillWOParams() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - null); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java deleted file mode 100644 index d5b4ef2..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java +++ /dev/null @@ -1,449 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.entity.v0.process.Properties; -import org.apache.falcon.entity.v0.process.Property; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowJob.Status; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.List; - -/** - * Test Suite for instance rerun. - */ -@Test(groups = "embedded") -public class ProcessInstanceRerunTest extends BaseTestClass { - - private boolean restartRequired; - private String baseTestDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private String feedInputPath = baseTestDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestDir + "/output-data" + MINUTE_DATE_PATTERN; - private String feedInputTimedOutPath = baseTestDir + "/timedout" + MINUTE_DATE_PATTERN; - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class); - private static final double TIMEOUT = 10; - private String processName; - private String start = "?start=2010-01-02T01:00Z"; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - processName = bundles[0].getProcessName(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Schedule process. Kill some instances. Rerun some of that killed. Check that - * instances were rerun correctly and other are still killed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunSomeKilled02() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - start + "&end=2010-01-02T01:16Z"); - InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); - List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); - InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0); - } - - /** - * Schedule process. Kill some instances. Rerun some of these killed without using -start or - * -end parameters. Should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunKilledWOParams() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - start + "&end=2010-01-02T01:16Z"); - InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); - r = prism.getProcessHelper().getProcessInstanceRerun(processName, - null); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Kill some instances. Rerun some of these killed using only - * -end parameter. Should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunKilledWOStartParam() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - start + "&end=2010-01-02T01:16Z"); - InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); - r = prism.getProcessHelper().getProcessInstanceRerun(processName, - "?end=2010-01-02T01:11Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Kill some instances. Rerun some of these killed using only - * -start parameter. Should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunKilledWOEndParam() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, - start + "&end=2010-01-02T01:16Z"); - InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); - r = prism.getProcessHelper().getProcessInstanceRerun(processName, - start); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Kill all instances. Rerun them. Check that they were rerun. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunMultipleKilled() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - String process = bundles[0].getProcessData(); - LOGGER.info("process: " + Util.prettyPrintXml(process)); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z"); - InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); - prism.getProcessHelper(). - getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z"); - InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0); - } - - /** - * Schedule process. Kill some instances. Rerun them. Check that there are no killed - * instances left. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunSomeKilled01() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z"); - InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); - TimeUtil.sleepSeconds(TIMEOUT); - InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0); - } - - /** - * Schedule process. Kill single instance. Rerun it. Check it was rerun. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunSingleKilled() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper().getProcessInstanceKill(processName, - start + "&end=2010-01-02T01:01Z"); - String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:01Z"); - Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID)); - } - - /** - * Schedule process. Wait till it got succeeded. Rerun first succeeded instance. Check if it - * is running. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunSingleSucceeded() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(6); - bundles[0].submitFeedsScheduleProcess(prism); - String process = bundles[0].getProcessData(); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING, - Status.SUCCEEDED).get(0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction - .Status.SUCCEEDED, EntityType.PROCESS); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:01Z&force=true"); - Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID)); - } - - /** - * Schedule process. Suspend its instances. Try to rerun them. Check that instances weren't - * rerun and are still suspended. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunSingleSuspended() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:06Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(2); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - start + "&end=2010-01-02T01:06Z"); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:06Z"); - Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1), - CoordinatorAction.Status.SUSPENDED); - } - - /** - * Schedule process. Wait till its instances succeed. Rerun them all. Check they are running. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunMultipleSucceeded() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:08Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(2); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z&force=true"); - InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 2, 2, 0, 0); - } - - /** - * Schedule process with invalid input feed data path. Wait till process got timed-out. Rerun - * it's instances. Check that they were rerun and are waiting (wait for input data). - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceRerunTimedOut() throws Exception { - bundles[0].setInputFeedDataPath(feedInputTimedOutPath); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setProcessTimeOut(2, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - CoordinatorAction.Status s; - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS); - prism.getProcessHelper().getProcessInstanceRerun(processName, - start + "&end=2010-01-02T01:11Z"); - s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0); - Assert.assertEquals(s, CoordinatorAction.Status.WAITING, - "instance should have been in WAITING state"); - } - - @Test(groups = {"singleCluster"}, timeOut = 1200000) - public void testProcessInstanceRerunFailedPostProcessing() throws Exception { - restartRequired=true; - bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - - //bring down Server1 colo - Util.shutDownService(cluster.getClusterHelper()); - - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - - //wait for instance to go in killing state - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5); - - Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing") - .contains("KILLED"), true); - Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action") - .contains("SUCCEEDED"), true); - - //start Server1 colo - Util.startService(cluster.getClusterHelper()); - TimeUtil.sleepSeconds(10); - - prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z"); - - while (!OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "post-processing").contains("SUCCEEDED")) { - TimeUtil.sleepSeconds(10); - } - } - - @Test(groups = {"singleCluster"}, timeOut = 1200000) - public void testProcessInstanceRerunFailedWorkflowAction() throws Exception { - - // Defining path to be used in pig script - String propPath = cleanAndGetTestDir() + "/rerun"; - org.apache.falcon.entity.v0.process.Process processElement = bundles[0].getProcessObject(); - Properties properties = new Properties(); - Property propertyInput = new Property(); - propertyInput.setName("inputPath"); - propertyInput.setValue(propPath); - - Property propertyOutput = new Property(); - propertyOutput.setName("outputPath"); - propertyOutput.setValue(propPath + "/output"); - properties.getProperties().add(propertyInput); - properties.getProperties().add(propertyOutput); - processElement.setProperties(properties); - bundles[0].setProcessData(processElement.toString()); - - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.MULTIPLE_ACTION_WORKFLOW); - HadoopUtil.copyDataToFolder(clusterFS, aggregateWorkflowDir, OSUtil.concat(OSUtil.PIG_DIR, "id.pig")); - - bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z"); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - - //wait for instance to get killed - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.KILLED, EntityType.PROCESS, 5); - - Assert.assertEquals(OozieUtil.getWorkflowActionStatus(clusterOC, bundleId, "user-action") - .contains("KILLED"), true); - Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "pig") - .contains("KILLED"), true); - Assert.assertEquals(OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "aggregator") - .contains("SUCCEEDED"), true); - - HadoopUtil.uploadDir(clusterFS, propPath, OSUtil.MULTIPLE_ACTION_WORKFLOW); - - prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2015-01-02T01:00Z&end=2015-01-02T01:04Z"); - - while (!OozieUtil.getSubWorkflowActionStatus(clusterOC, bundleId, "user-action", "pig").contains("SUCCEEDED")) { - TimeUtil.sleepSeconds(10); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java deleted file mode 100644 index b7f4428..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java +++ /dev/null @@ -1,293 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Process instance resume tests. - */ -@Test(groups = "embedded") -public class ProcessInstanceResumeTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(ProcessInstanceResumeTest.class); - private String processName; - private String wholeRange = "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z"; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessConcurrency(6); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z"); - processName = bundles[0].getProcessName(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Schedule process. Suspend some instances. Attempt to -resume instance using single -end - * parameter. Instances up to the end date (exclusive) will be resumed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeOnlyEnd() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - wholeRange); - InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0); - r = prism.getProcessHelper().getProcessInstanceResume(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - InstanceUtil.validateResponse(r, 3, 3, 0, 0, 0); - } - - /** - * Schedule process. Suspend some instances. Try to perform -resume using time range which - * effects only on one instance. Check that this instance was resumed es expected. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeResumeSome() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - wholeRange); - InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0); - prism.getProcessHelper().getProcessInstanceResume(processName, - "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z"); - r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange); - InstanceUtil.validateResponse(r, 6, 5, 1, 0, 0); - } - - /** - * Schedule process. Suspend some instances. Try to perform -resume using time range which - * effects on all instances. Check that there are no suspended instances. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeResumeMany() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String withinRange = "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z"; - prism.getProcessHelper().getProcessInstanceSuspend(processName, withinRange); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - wholeRange); - InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0); - prism.getProcessHelper().getProcessInstanceResume(processName, withinRange); - r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange); - InstanceUtil.validateResponse(r, 6, 6, 0, 0, 0); - } - - /** - * Schedule process. Suspend first instance. Resume that instance using only -start parameter. - * Check that mentioned instance was resumed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeSingle() throws Exception { - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 2); - String param = "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z"; - prism.getProcessHelper().getProcessInstanceSuspend(processName, param); - prism.getProcessHelper().getProcessInstanceResume(processName, param); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, param); - InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0); - } - - /** - * Attempt to resume instances of non-existent process should fail with an appropriate - * status code. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeNonExistent() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstancesResult r = prism.getProcessHelper().getProcessInstanceResume("invalidName", - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Attempt to perform -resume action without time range parameters should fail with an - + appropriate status code or message. Will fail now due to jira: https://issues.apache.org/jira/browse/FALCON-710 - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeNoParams() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName, null); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Attempt to perform -resume action without -end parameter. Should fail with an - + appropriate status code or message. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeWOEndParam() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName, "?start=2010-01-02T01:00Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Attempt to perform -resume action without -start parameter. Should fail with an - + appropriate status code or message. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeWOStartParam() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName, "?end=2010-01-02T01:15Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process, remove it. Try to -resume it's instance. Attempt should fail with - * an appropriate status code. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeDeleted() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - prism.getProcessHelper().delete(bundles[0].getProcessData()); - InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName, - "?start=2010-01-02T01:05Z"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Schedule process. Try to resume entity which wasn't suspended. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeNonSuspended() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String start = "?start=2010-01-02T01:05Z&end=2010-01-02T01:26Z"; - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, start); - InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0); - r = prism.getProcessHelper().getProcessInstanceResume(processName, start); - InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0); - } - - /** - * Schedule process. Suspend last instance. Resume it using parameter which points to - * expected materialization time of last instance. Check that there are no suspended - * instances among all which belong to current process. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceResumeLastInstance() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String last = "?start=2010-01-02T01:25Z&end=2010-01-02T01:26Z"; - prism.getProcessHelper().getProcessInstanceSuspend(processName, last); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - wholeRange); - InstanceUtil.validateResponse(r, 6, 5, 1, 0, 0); - prism.getProcessHelper().getProcessInstanceResume(processName, last); - r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange); - InstanceUtil.validateResponse(r, 6, 6, 0, 0, 0); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java deleted file mode 100644 index 98fdcca..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.WorkflowStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** - * Regression for instance running api. - */ -@Test(groups = "embedded") -public class ProcessInstanceRunningTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private OozieClient clusterOC = serverOC.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRunningTest.class); - private static final double TIMEOUT = 15; - private String processName; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - processName = bundles[0].getProcessName(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Run process. Suspend it and then resume. Get all -running instances. Response should - * contain all process instances. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getResumedProcessInstance() throws Exception { - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String process = bundles[0].getProcessData(); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(process)); - TimeUtil.sleepSeconds(TIMEOUT); - AssertUtil.assertSucceeded(prism.getProcessHelper().resume(process)); - TimeUtil.sleepSeconds(TIMEOUT); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - } - - /** - * Run process. Suspend it. Try to get -running instances. Response should be - * successful but shouldn't contain any instance. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getSuspendedProcessInstance() throws Exception { - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData())); - TimeUtil.sleepSeconds(TIMEOUT); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccessWOInstances(r); - } - - /** - * Run process. Get -running instances. Check that response contains expected number of - * instances. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getRunningProcessInstance() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - } - - /** - * Attempt to get -running instances of nonexistent process should result in error. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getNonExistenceProcessInstance() throws Exception { - InstancesResult r = prism.getProcessHelper().getRunningInstance("invalidName"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Attempt to get -running instances of deleted process should result in error. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getKilledProcessInstance() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - prism.getProcessHelper().delete(bundles[0].getProcessData()); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Launch process and wait till it got succeeded. Try to get -running instances. Response - * should reflect success but shouldn't contain any of instances. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void getSucceededProcessInstance() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - OozieUtil.waitForBundleToReachState(clusterOC, processName, Job.Status.SUCCEEDED); - InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); - InstanceUtil.validateSuccessWOInstances(r); - } -}
