http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java deleted file mode 100644 index 14ecfe4..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.APIResult; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.WorkflowStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction.Status; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import javax.xml.bind.JAXBException; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.List; - -/** - * Process instance status tests. - */ -@Test(groups = "embedded") -public class ProcessInstanceStatusTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private String feedInputTimedOutPath = baseTestHDFSDir + "/timedoutStatus" - + MINUTE_DATE_PATTERN; - private String feedOutputTimedOutPath = - baseTestHDFSDir + "/output-data/timedoutStatus" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class); - private static final double TIMEOUT = 15; - private String processName; - private OozieClient clusterOC = serverOC.get(0); - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - /** - * Configures general process definition which particular properties can be overwritten. - */ - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z"); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - processName = bundles[0].getProcessName(); - HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * time out is set as 3 minutes .... getStatus is for a large range in past. - * 6 instance should be materialized and one in running and other in waiting - * Adding logging information test as part of FALCON-813. - * In case status does not contain jobId of instance the test should fail. - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusStartAndEndCheckNoInstanceAfterEndDate() - throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z"); - bundles[0].setProcessTimeOut(3, TimeUnit.minutes); - bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Perform -getStatus using only -start parameter within time-range of non-materialized - * instances. There should be no instances returned in response. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusOnlyStartAfterMat() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z"); - bundles[0].setProcessTimeOut(3, TimeUnit.minutes); - bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T05:00Z"); - AssertUtil.assertSucceeded(r); - Assert.assertEquals(r.getInstances(), null); - } - - /** - * Schedule process. Perform -getStatus using -end parameter which is out of process - * validity range. Attempt should succeed with end defaulted to entity end. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusEndOutOfRange() throws Exception { - HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z"); - InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0); - } - - /** - * Schedule process and try to -getStatus without date parameters. Attempt should succeed. Start defaults - * to start of entity and end defaults to end of entity. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusDateEmpty() - throws JAXBException, AuthenticationException, IOException, URISyntaxException, - OozieClientException, InterruptedException { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5, - Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); - InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule process with number of instances. Perform -getStatus request with valid - * parameters. Attempt should succeed. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusStartAndEnd() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 , - Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule process. Perform -getStatus using -start parameter which is out of process - * validity range. Attempt should succeed, with start defaulted to entity start time. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusStartOutOfRange() throws Exception { - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule and then delete process. Try to get the status of its instances. Attempt should - * fail with an appropriate code. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusKilled() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - AssertUtil.assertSucceeded(prism.getProcessHelper().delete(bundles[0].getProcessData())); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Schedule process and then suspend it. -getStatus of first instance only -start parameter. - * Instance should be suspended. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusOnlyStartSuspended() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(bundles[0].getProcessData())); - TimeUtil.sleepSeconds(TIMEOUT); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z"); - Assert.assertEquals(r.getStatus(), APIResult.Status.SUCCEEDED); - Assert.assertEquals(InstanceUtil.instancesInResultWithStatus(r, WorkflowStatus.SUSPENDED), 1); - } - - /** - * Schedule process. Try to -getStatus using -start/-end parameters with values which were - * reversed i.e. -start is further than -end. Attempt should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusReverseDateRange() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1, - Status.RUNNING, EntityType.PROCESS); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:20Z&end=2010-01-02T01:07Z"); - InstanceUtil.validateError(r, ResponseErrors.START_BEFORE_SCHEDULED); - } - - /** - * Schedule process. Perform -getStatus using -start/-end parameters which are out of process - * validity range. Attempt should succeed, with start/end defaulted to entity's start/end. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusStartEndOutOfRange() throws Exception { - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(2); - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, - Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z"); - InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule process. Suspend and then resume it. -getStatus of its instances. Check that - * response reflects that instances are running. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusResumed() throws Exception { - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputPath); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - String process = bundles[0].getProcessData(); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - Status.RUNNING, EntityType.PROCESS, 5); - AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(process)); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.SUSPENDED, EntityType.PROCESS, 3); - TimeUtil.sleepSeconds(TIMEOUT); - AssertUtil.assertSucceeded(prism.getProcessHelper().resume(process)); - TimeUtil.sleepSeconds(TIMEOUT); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - Status.RUNNING, EntityType.PROCESS, 5); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z"); - InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); - } - - /** - * Schedule process. -getStatus of it's first instance using only -start parameter which - * points to start time of process validity. Check that response reflects expected status of - * instance. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusOnlyStart() throws Exception { - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, - Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z"); - InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule process. Try to perform -getStatus using valid -start parameter but invalid - * process name. Attempt should fail with an appropriate status code. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusInvalidName() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z"); - bundles[0].submitFeedsScheduleProcess(prism); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceStatus("invalidProcess", "?start=2010-01-01T01:00Z"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Schedule process. Try to -getStatus without time range parameters. Attempt succeeds. - * Adding logging information test as part of status information. - * In case status does not contain jobId of instance the test should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusWoParams() throws Exception { - bundles[0].setProcessConcurrency(5); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); - InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0); - List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); - Assert.assertTrue(matchWorkflows(instanceWfIDs, oozieWfIDs), "No job ids exposed in status message"); - } - - /** - * Schedule process with timeout set to 2 minutes. Wait till it become timed-out. -getStatus - * of that process. Check that all materialized instances are failed. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceStatusTimedOut() throws Exception { - bundles[0].setInputFeedDataPath(feedInputTimedOutPath); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setProcessTimeOut(2, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(feedOutputTimedOutPath); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, Status.TIMEDOUT, - EntityType.PROCESS); - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z"); - InstanceUtil.validateFailedInstances(r, 3); - } - - /** - * Check that default end time param value is now. - */ - @Test - public void testDefaultEndTimeParam() - throws OozieClientException, IOException, InterruptedException, AuthenticationException, URISyntaxException, - JAXBException { - //set validity to have 12 instances - String start = TimeUtil.getTimeWrtSystemTime(-60); - String end = TimeUtil.getTimeWrtSystemTime(0); - bundles[0].setProcessValidity(start, end); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - //make first 3 instances running - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, Status.RUNNING, - EntityType.PROCESS); - //check instances status with end, expected first 10 instances - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=" + start + "&end=" + TimeUtil.addMinsToTime(end, -11)); - InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0); - //request the same but without end, expected to have the latest 10 instances - r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=" + start); - InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); - //the same with numResults which includes/excludes all running instances - r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=" + start + "&end=" + TimeUtil.addMinsToTime(end, -16) + "&numResults=9"); - InstanceUtil.validateResponse(r, 9, 3, 0, 6, 0); - //expected end is set to now, thus getting last 9 instances - r = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=" + start + "&numResults=9"); - InstanceUtil.validateResponse(r, 9, 0, 0, 9, 0); - } - - /* - * Function to match the workflows obtained from instance status and oozie. - */ - private boolean matchWorkflows(List<String> instanceWf, List<String> oozieWf) { - Collections.sort(instanceWf); - Collections.sort(oozieWf); - if (instanceWf.size() != oozieWf.size()) { - return false; - } - for (int index = 0; index < instanceWf.size(); index++) { - if (!instanceWf.get(index).contains(oozieWf.get(index))) { - return false; - } - } - return true; - } - -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java deleted file mode 100644 index 4a27a0a..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.falcon.resource.InstancesResult; -import org.apache.hadoop.fs.FileSystem; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.IOException; - - -/** - * Process instance suspend tests. - */ -@Test(groups = "embedded") -public class ProcessInstanceSuspendTest extends BaseTestClass { - - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private ColoHelper cluster = servers.get(0); - private FileSystem clusterFS = serverFS.get(0); - private String processName; - private OozieClient clusterOC = serverOC.get(0); - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setOutputFeedLocationData(feedOutputPath); - processName = bundles[0].getProcessName(); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() throws IOException { - removeTestClassEntities(); - HadoopUtil.deleteDirIfExists(baseTestHDFSDir, clusterFS); - } - - /** - * Schedule process. Try to suspend instances with start/end parameters which are - * wider then process validity range. Succeeds. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendLargeRange() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z"); - result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z"); - InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0); - } - - /** - * Schedule single-instance process. Wait till instance succeed. Try to suspend - * succeeded instance. Action should be performed successfully as indempotent action. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendSucceeded() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessConcurrency(1); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:01Z"); - AssertUtil.assertSucceeded(r); - } - - /** - * Schedule process. Check that all instances are running. Suspend them. Check that all are - * suspended. In every action valid time range is used. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendAll() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0); - } - - /** - * Schedule process and try to perform -suspend action without date range parameters. - * Attempt should fail. Will fail because of jira : https://issues.apache.org/jira/browse/FALCON-710 - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendWoParams() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z"); - bundles[0].submitFeedsScheduleProcess(prism); - InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, null); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process with 3 running and 2 waiting instances expected. Suspend ones which are - * running. Check that now 3 are suspended and 2 are still waiting. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendStartAndEnd() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z"); - InstanceUtil.validateResponse(result, 5, 3, 0, 2, 0); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z"); - result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z"); - InstanceUtil.validateResponse(result, 5, 0, 3, 2, 0); - } - - /** - * Try to suspend process which wasn't submitted and scheduled. Action should fail. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendNonExistent() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstancesResult r = prism.getProcessHelper() - .getProcessInstanceSuspend("invalidName", "?start=2010-01-02T01:20Z"); - InstanceUtil.validateError(r, ResponseErrors.PROCESS_NOT_FOUND); - } - - /** - * Schedule process. Perform -suspend action using only -start parameter which points to start - * time of process. Attempt suspends all instances - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendOnlyStart() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:00Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process. Perform -suspend action using only -end parameter. - * Should fail with appropriate status message. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendOnlyEnd() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z"); - bundles[0].setProcessConcurrency(3); - bundles[0].submitFeedsScheduleProcess(prism); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?end=2010-01-02T01:05Z"); - InstanceUtil.validateError(r, ResponseErrors.UNPARSEABLE_DATE); - } - - /** - * Schedule process with a number of instances running. Perform -suspend action using params - * such that they aim to suspend the last instance. Check that only - * the last instance is suspended. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void testProcessInstanceSuspendSuspendLast() throws Exception { - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); - bundles[0].setProcessConcurrency(5); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, - CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0); - prism.getProcessHelper().getProcessInstanceSuspend(processName, - "?start=2010-01-02T01:20Z&end=2010-01-02T01:23Z"); - result = prism.getProcessHelper().getProcessInstanceStatus(processName, - "?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z"); - InstanceUtil.validateResponse(result, 5, 4, 1, 0, 0); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java deleted file mode 100644 index 6a12fc8..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.LateInput; -import org.apache.falcon.entity.v0.process.LateProcess; -import org.apache.falcon.entity.v0.process.PolicyType; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.TestNGException; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.List; - -/** - * Process late data test. - */ -@Test(groups = "embedded") -public class ProcessLateRerunTest extends BaseTestClass { - private ColoHelper cluster1 = servers.get(0); - private OozieClient cluster1OC = serverOC.get(0); - private FileSystem cluster1FS = serverFS.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(ProcessLateRerunTest.class); - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - Bundle bundle = BundleUtil.readLateDataBundle(); - bundles[0] = new Bundle(bundle, servers.get(0)); - bundles[0].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setOutputFeedLocationData(feedOutputPath); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Test demonstrates rerunning process for late arrival of data. - * Initially there is no input data and empty folders are processed. - * It checks the number of rerun attempts once late data has been added - * ensuring that late rerun happened. - */ - @Test(enabled = true) - public void testProcessLateRerunOnEmptyFolder() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 30); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(2); - - String inputName = bundles[0].getProcessObject().getFirstInputName(); - bundles[0].setProcessLatePolicy(getLateData(2, "minutes", "periodic", inputName, aggregateWorkflowDir)); - - bundles[0].submitAndScheduleProcess(); - AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); - - getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 1); - - int sleepMins = 6; - for(int i=0; i < sleepMins; i++) { - LOGGER.info("Waiting..."); - TimeUtil.sleepSeconds(60); - } - InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - - List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(), - bundles[0].getProcessName(), EntityType.PROCESS); - String bundleID = bundleList.get(0); - - OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); - } - - /** - * Test demonstrates rerunning process for late arrival of data. - * Initially there is some data which is processed. It checks the number of rerun attempts - * once further more data has been added ensuring that late rerun happened. - */ - @Test(enabled = true) - public void testProcessLateRerunWithData() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 30); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(2); - - String inputName = bundles[0].getProcessObject().getFirstInputName(); - - bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); - bundles[0].submitAndScheduleProcess(); - AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); - - getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, true, 1); - - int sleepMins = 6; - for(int i=0; i < sleepMins; i++) { - LOGGER.info("Waiting..."); - TimeUtil.sleepSeconds(60); - } - InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - - List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(), - bundles[0].getProcessName(), EntityType.PROCESS); - String bundleID = bundleList.get(0); - - OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); - } - - /** - * Test demonstrates rerunning process for late arrival of data for multiple input folders. - * It checks the number of rerun attempts once further more data has been added ensuring that late rerun happened. - */ - @Test(enabled = true) - public void testProcessLateRerunWithMultipleFolders() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 30); - String startInstance = "now(0,-5)"; - String endInstance = "now(0,0)"; - LOGGER.info("Time range between : " + startTime + " and " + endTime); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes); - String inputName = bundles[0].getProcessObject().getFirstInputName(); - - bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); - bundles[0].setProcessConcurrency(2); - - // Increase the window of input for process - bundles[0].setDatasetInstances(startInstance, endInstance); - bundles[0].submitAndScheduleProcess(); - - AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); - - getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 3); - - int sleepMins = 6; - for(int i=0; i < sleepMins; i++) { - LOGGER.info("Waiting..."); - TimeUtil.sleepSeconds(60); - } - InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - - List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(), - bundles[0].getProcessName(), EntityType.PROCESS); - String bundleID = bundleList.get(0); - - OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); - } - - /** - * Test demonstrates rerunning process for late arrival of data for gate folders. - * Late rerun will not work on gate folder, so no retry attempt on the appended data. - */ - @Test(enabled = true) - public void testProcessLateRerunWithGate() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 30); - String startInstance = "now(0,-5)"; - String endInstance = "now(0,0)"; - LOGGER.info("Time range between : " + startTime + " and " + endTime); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(10, Frequency.TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(10, Frequency.TimeUnit.minutes); - bundles[0].setProcessConcurrency(2); - - // Increase the window of input for process - bundles[0].setDatasetInstances(startInstance, endInstance); - - ProcessMerlin process = bundles[0].getProcessObject(); - String inputName = process.getFirstInputName(); - Input tempFeed = process.getInputs().getInputs().get(0); - - Input gateInput = new Input(); - gateInput.setName("Gate"); - gateInput.setFeed(tempFeed.getFeed()); - gateInput.setEnd("now(0,1)"); - gateInput.setStart("now(0,1)"); - process.getInputs().getInputs().add(gateInput); - bundles[0].setProcessData(process.toString()); - - bundles[0].setProcessLatePolicy(getLateData(4, "minutes", "periodic", inputName, aggregateWorkflowDir)); - - bundles[0].submitAndScheduleProcess(); - AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - - TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); - - getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 7); - - int sleepMins = 6; - for(int i=0; i < sleepMins; i++) { - LOGGER.info("Waiting..."); - TimeUtil.sleepSeconds(60); - } - - InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - - List<String> bundleList = OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(), - bundles[0].getProcessName(), EntityType.PROCESS); - String bundleID = bundleList.get(0); - - OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 0); - } - - /* - dataFlag - denotes whether process should run initially on empty folders or folders containing data - dataFolder - denotes the folder where you want to upload data for late rerun - */ - private void getAndCreateDependencies(ColoHelper prismHelper, Bundle bundle, - OozieClient oozieClient, FileSystem clusterFS, - boolean dataFlag, int dataFolder) { - try { - List<String> bundles = null; - for (int i = 0; i < 10; ++i) { - bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - bundle.getProcessName(), EntityType.PROCESS); - if (bundles.size() > 0) { - break; - } - TimeUtil.sleepSeconds(30); - } - Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created."); - String bundleID = bundles.get(0); - LOGGER.info("bundle id: " + bundleID); - List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID); - for (int i = 0; i < 10 && missingDependencies == null; ++i) { - TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID); - } - Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); - - //print missing dependencies - for (String dependency : missingDependencies) { - LOGGER.info("dependency from job: " + dependency); - } - - //create missing dependencies - LOGGER.info("Creating missing dependencies..."); - OozieUtil.createMissingDependencies(prismHelper, EntityType.PROCESS, bundle.getProcessName(), 0, 0); - - //Adding data to empty folders depending on dataFlag - if (dataFlag) { - int tempCount = 1; - for (String location : missingDependencies) { - if (tempCount==1) { - LOGGER.info("Transferring data to : " + location); - HadoopUtil.copyDataToFolder(clusterFS, location, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - tempCount++; - } - } - } - - //Process succeeding on empty folders - LOGGER.info("Waiting for process to succeed..."); - InstanceUtil.waitTillInstanceReachState(oozieClient, bundle.getProcessName(), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - - TimeUtil.sleepSeconds(30); - - //Adding data to check late rerun - int tempCounter = 1; - for (String dependency : missingDependencies) { - if (tempCounter==dataFolder) { - LOGGER.info("Transferring late data to : " + dependency); - HadoopUtil.copyDataToFolder(clusterFS, dependency, - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.properties")); - } - tempCounter++; - } - - } catch (Exception e) { - e.printStackTrace(); - throw new TestNGException(e); - } - } - - private static LateProcess getLateData(int delay, String delayUnits, String retryType, - String inputData, String workflowDir) { - LateInput lateInput = new LateInput(); - lateInput.setInput(inputData); - lateInput.setWorkflowPath(workflowDir); - LateProcess lateProcess = new LateProcess(); - lateProcess.setDelay(new Frequency(delayUnits + "(" + delay + ")")); - lateProcess.setPolicy(PolicyType.fromValue(retryType)); - lateProcess.getLateInputs().add(lateInput); - return lateProcess; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java deleted file mode 100644 index 8422796..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.commons.io.FileUtils; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.Job.Status; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.FileOutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.List; -import java.util.Map; - -/** - * Tests with process lib folder with workflow.xml. - */ -@Test(groups = "embedded") -public class ProcessLibPathLoadTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private FileSystem clusterFS = serverFS.get(0); - private String testDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = testDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(ProcessLibPathLoadTest.class); - - private String oozieLib = MerlinConstants.OOZIE_EXAMPLE_LIB; - private String oozieLibName = oozieLib.substring(oozieLib.lastIndexOf('/') + 1); - private String filename = OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib", oozieLibName); - private String processName; - private String process; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - FileUtils.forceMkdir(new File(OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib"))); - saveUrlToFile(oozieLib); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.OOZIE_LIB_FOLDER); - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(testDir + MINUTE_DATE_PATTERN); - bundles[0].setProcessValidity("2015-01-02T01:00Z", "2015-01-02T01:04Z"); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(testDir + "/output-data" + MINUTE_DATE_PATTERN); - bundles[0].setProcessConcurrency(1); - bundles[0].setProcessLibPath(aggregateWorkflowDir + "/lib"); - process = bundles[0].getProcessData(); - processName = Util.readEntityName(process); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - @AfterClass(alwaysRun = true) - public void deleteJar() throws IOException { - File file = new File(filename); - Assert.assertEquals(file.delete(), true, filename + " is not present."); - FileUtils.deleteDirectory(new File(OSUtil.concat(OSUtil.OOZIE_LIB_FOLDER, "lib"))); - } - - /** - * Test which test a process with jar in lib location. - * Schedule a process, it should succeed. - * - * @throws Exception - */ - @Test - public void setRightJarInWorkflowLib() throws Exception { - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); - } - - /** - * Test which test a process with no jar in lib location. - * Schedule a process, it should get killed. - * - * @throws Exception - */ - @Test - public void setNoJarInWorkflowLibLocation() throws Exception { - HadoopUtil.deleteDirIfExists(aggregateWorkflowDir + "/lib/" + oozieLibName, clusterFS); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.KILLED); - } - - /** - * Function to download jar at remote public location. - * @param urlString public location from where jar is to be downloaded - * filename is the location where the jar is to be saved - * @throws Exception - */ - private void saveUrlToFile(String urlString) - throws IOException { - - URL url = new URL(urlString); - String link; - HttpURLConnection http = (HttpURLConnection) url.openConnection(); - Map<String, List<String>> header = http.getHeaderFields(); - while (isRedirected(header)) { - link = header.get("Location").get(0); - url = new URL(link); - http = (HttpURLConnection) url.openConnection(); - header = http.getHeaderFields(); - } - - InputStream input = http.getInputStream(); - byte[] buffer = new byte[4096]; - int n; - OutputStream output = new FileOutputStream(new File(filename)); - while ((n = input.read(buffer)) != -1) { - output.write(buffer, 0, n); - } - output.close(); - } - - private static boolean isRedirected(Map<String, List<String>> header) { - for (String hv : header.get(null)) { - if (hv.contains(" 301 ") || hv.contains(" 302 ")) { - return true; - } - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java deleted file mode 100644 index 4196d99..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Logger; -import org.apache.oozie.client.Job.Status; -import org.apache.oozie.client.OozieClient; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.List; - -/** - * Tests with process lib folder detached from workflow.xml. - */ -@Test(groups = "embedded") -public class ProcessLibPathTest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private OozieClient clusterOC = serverOC.get(0); - private FileSystem clusterFS = serverFS.get(0); - private String testDir = cleanAndGetTestDir(); - private String testLibDir = testDir + "/TestLib"; - private static final Logger LOGGER = Logger.getLogger(ProcessLibPathTest.class); - private String processName; - private String process; - - @BeforeClass(alwaysRun = true) - public void createTestData() throws Exception { - LOGGER.info("in @BeforeClass"); - Bundle b = BundleUtil.readELBundle(); - b.generateUniqueBundle(this); - b = new Bundle(b, cluster); - String startDate = "2010-01-01T22:00Z"; - String endDate = "2010-01-02T03:00Z"; - b.setInputFeedDataPath(testDir + "/input" + MINUTE_DATE_PATTERN); - String prefix = b.getFeedDataPathPrefix(); - HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS); - List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20); - HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates); - } - - @BeforeMethod(alwaysRun = true) - public void setup() throws Exception { - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].setInputFeedDataPath(testDir + MINUTE_DATE_PATTERN); - bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); - bundles[0].setProcessPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); - bundles[0].setOutputFeedLocationData(testDir + "/output-data" + MINUTE_DATE_PATTERN); - bundles[0].setProcessConcurrency(1); - bundles[0].setProcessLibPath(testLibDir); - process = bundles[0].getProcessData(); - processName = Util.readEntityName(process); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Tests a process with no lib folder in workflow location. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void setDifferentLibPathWithNoLibFolderInWorkflowfLocaltion() throws Exception { - String workflowDir = testLibDir + "/aggregatorLib1/"; - HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE); - bundles[0].setProcessWorkflow(workflowDir); - LOGGER.info("processData: " + Util.prettyPrintXml(process)); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); - } - - /** - * Test which test a process with wrong jar in lib folder in workflow location. - * - * @throws Exception - */ - @Test(groups = {"singleCluster"}) - public void setDifferentLibPathWithWrongJarInWorkflowLib() throws Exception { - String workflowDir = testLibDir + "/aggregatorLib2/"; - HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE); - HadoopUtil.recreateDir(clusterFS, workflowDir + "/lib"); - HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib/invalid.jar", - OSUtil.concat(OSUtil.NORMAL_INPUT, "dataFile.xml")); - bundles[0].setProcessWorkflow(workflowDir); - LOGGER.info("processData: " + Util.prettyPrintXml(process)); - bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java deleted file mode 100644 index f4c9b30..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessSLATest.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.response.ServiceResponse; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - - -/** -* Process SLA tests. -*/ -@Test(groups = "embedded") -public class ProcessSLATest extends BaseTestClass { - - private ColoHelper cluster = servers.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private static final Logger LOGGER = Logger.getLogger(ProcessSLATest.class); - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - String startTime = TimeUtil.getTimeWrtSystemTime(0); - String endTime = TimeUtil.addMinsToTime(startTime, 20); - LOGGER.info("Time range between : " + startTime + " and " + endTime); - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster); - bundles[0].generateUniqueBundle(this); - bundles[0].submitClusters(prism); - bundles[0].setInputFeedDataPath(baseTestHDFSDir + MINUTE_DATE_PATTERN); - bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN); - bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); - bundles[0].submitFeeds(prism); - bundles[0].setProcessConcurrency(1); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - - /** - * Schedule process with correctly adjusted sla. Response should reflect success. - * - */ - @Test - public void scheduleValidProcessSLA() throws Exception { - - ProcessMerlin processMerlin = bundles[0].getProcessObject(); - processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), - new Frequency("6", Frequency.TimeUnit.hours)); - bundles[0].setProcessData(processMerlin.toString()); - ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString()); - AssertUtil.assertSucceeded(response); - } - - /** - * Schedule process with slaStart and slaEnd having equal value. Response should reflect success. - * - */ - @Test - public void scheduleProcessWithSameSLAStartSLAEnd() throws Exception { - - ProcessMerlin processMerlin = bundles[0].getProcessObject(); - processMerlin.setSla(new Frequency("3", Frequency.TimeUnit.hours), - new Frequency("3", Frequency.TimeUnit.hours)); - bundles[0].setProcessData(processMerlin.toString()); - ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString()); - AssertUtil.assertSucceeded(response); - } - - /** - * Schedule process with slaEnd less than slaStart. Response should reflect failure. - * - */ - @Test - public void scheduleProcessWithSLAEndLowerthanSLAStart() throws Exception { - - ProcessMerlin processMerlin = bundles[0].getProcessObject(); - processMerlin.setSla(new Frequency("4", Frequency.TimeUnit.hours), - new Frequency("2", Frequency.TimeUnit.hours)); - bundles[0].setProcessData(processMerlin.toString()); - ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString()); - LOGGER.info("response : " + response.getMessage()); - - String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "(" - + processMerlin.getSla().getShouldStartIn().getFrequency() + ")is greater than shouldEndIn: " - + processMerlin.getSla().getShouldEndIn().getTimeUnit() +"(" - + processMerlin.getSla().getShouldEndIn().getFrequency() + ")"; - validate(response, message); - } - - /** - * Schedule process with timeout greater than slaStart. Response should reflect success. - * - */ - @Test - public void scheduleProcessWithTimeoutGreaterThanSLAStart() throws Exception { - - ProcessMerlin processMerlin = bundles[0].getProcessObject(); - processMerlin.setTimeout(new Frequency("3", Frequency.TimeUnit.hours)); - processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours), - new Frequency("4", Frequency.TimeUnit.hours)); - bundles[0].setProcessData(processMerlin.toString()); - ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString()); - AssertUtil.assertSucceeded(response); - } - - /** - * Schedule process with timeout less than slaStart. Response should reflect failure. - * - */ - @Test - public void scheduleProcessWithTimeoutLessThanSLAStart() throws Exception { - - ProcessMerlin processMerlin = bundles[0].getProcessObject(); - processMerlin.setTimeout(new Frequency("1", Frequency.TimeUnit.hours)); - processMerlin.setSla(new Frequency("2", Frequency.TimeUnit.hours), - new Frequency("4", Frequency.TimeUnit.hours)); - bundles[0].setProcessData(processMerlin.toString()); - ServiceResponse response = prism.getProcessHelper().submitAndSchedule(processMerlin.toString()); - - String message = "shouldStartIn of Process: " + processMerlin.getSla().getShouldStartIn().getTimeUnit() + "(" - + processMerlin.getSla().getShouldStartIn().getFrequency() + ") is greater than timeout: " - +processMerlin.getTimeout().getTimeUnit() +"(" + processMerlin.getTimeout().getFrequency() + ")"; - validate(response, message); - } - - private void validate(ServiceResponse response, String message) throws Exception { - AssertUtil.assertFailed(response); - LOGGER.info("Expected message is : " + message); - Assert.assertTrue(response.getMessage().contains(message), - "Correct response was not present in process schedule. Process response is : " - + response.getMessage()); - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java deleted file mode 100644 index dbb45a6..0000000 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessUpdateTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.regression; - -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.process.LateInput; -import org.apache.falcon.entity.v0.process.LateProcess; -import org.apache.falcon.entity.v0.process.PolicyType; -import org.apache.falcon.regression.Entities.ProcessMerlin; -import org.apache.falcon.regression.core.bundle.Bundle; -import org.apache.falcon.regression.core.util.AssertUtil; -import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; -import org.apache.falcon.regression.core.util.OSUtil; -import org.apache.falcon.regression.core.util.OozieUtil; -import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.log4j.Logger; -import org.apache.oozie.client.OozieClient; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * Tests related to update feature. - */ -@Test(groups = "embedded") -public class ProcessUpdateTest extends BaseTestClass { - - private OozieClient clusterOC = serverOC.get(0); - private String baseTestHDFSDir = cleanAndGetTestDir(); - private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; - private String feedInputPath = baseTestHDFSDir + "/input" + MINUTE_DATE_PATTERN; - private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN; - private static final Logger LOGGER = Logger.getLogger(ProcessUpdateTest.class); - - @BeforeClass(alwaysRun = true) - public void uploadWorkflow() throws Exception { - uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); - } - - @BeforeMethod(alwaysRun = true) - public void setUp() throws Exception { - Bundle bundle = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundle, servers.get(0)); - bundles[0].generateUniqueBundle(this); - bundles[0].setProcessWorkflow(aggregateWorkflowDir); - bundles[0].setInputFeedDataPath(feedInputPath); - bundles[0].setOutputFeedLocationData(feedOutputPath); - } - - /** - * Test for https://issues.apache.org/jira/browse/FALCON-99. - * Scenario: schedule a process which doesn't have late data handling and then update it to have it. - * Check that new coordinator was created. - */ - @Test - public void updateProcessWithLateData() throws Exception { - String start = TimeUtil.getTimeWrtSystemTime(-60); - String end = TimeUtil.getTimeWrtSystemTime(10); - bundles[0].submitAndScheduleAllFeeds(); - ProcessMerlin process = bundles[0].getProcessObject(); - process.setValidity(start, end); - process.setLateProcess(null); - prism.getProcessHelper().submitAndSchedule(process.toString()); - InstanceUtil.waitTillInstancesAreCreated(clusterOC, process.toString(), 0); - String bundleId = OozieUtil.getLatestBundleID(clusterOC, process.getName(), EntityType.PROCESS); - - //update process to have late data handling - LateProcess lateProcess = new LateProcess(); - lateProcess.setDelay(new Frequency("hours(1)")); - lateProcess.setPolicy(PolicyType.EXP_BACKOFF); - LateInput lateInput = new LateInput(); - lateInput.setInput("inputData"); - lateInput.setWorkflowPath(aggregateWorkflowDir); - lateProcess.getLateInputs().add(lateInput); - process.setLateProcess(lateProcess); - LOGGER.info("Updated process xml: " + Util.prettyPrintXml(process.toString())); - AssertUtil.assertSucceeded(prism.getProcessHelper().update(process.toString(), process.toString())); - - //check that new coordinator was created - String newBundleId = OozieUtil.getLatestBundleID(clusterOC, process.getName(), EntityType.PROCESS); - Assert.assertNotEquals(bundleId, newBundleId, "New Bundle should be created."); - } - - @AfterMethod(alwaysRun = true) - public void tearDown() { - removeTestClassEntities(); - } - -}
