http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java index f299128..2f5dbd9 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java @@ -101,7 +101,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -124,7 +124,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); bundles[0].setProcessConcurrency(10); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -148,10 +148,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setInputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); //create data for first 5 instances, 6th should be non-materialized - String bundleId = InstanceUtil.getSequenceBundleID(clusterOC, processName, + String bundleId = OozieUtil.getSequenceBundleID(clusterOC, processName, EntityType.PROCESS, 0); for(CoordinatorJob c : clusterOC.getBundleJobInfo(bundleId).getCoordinators()) { List<CoordinatorAction> actions = clusterOC.getCoordJobInfo(c.getId()).getActions(); @@ -192,7 +192,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity(startTime, endTime); bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17); String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23); @@ -215,7 +215,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z"); bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); String startTime = TimeUtil.getTimeWrtSystemTime(1); String endTime = TimeUtil.getTimeWrtSystemTime(40); InstancesResult r = prism.getProcessHelper() @@ -236,7 +236,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -258,7 +258,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 10); @@ -280,7 +280,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -302,7 +302,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -322,7 +322,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -342,7 +342,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -362,7 +362,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5);
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java index c65461c..f65f9c9 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java @@ -102,14 +102,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName, start + "&end=2010-01-02T01:16Z"); InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4); - List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName); + List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z"); InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0); @@ -127,7 +127,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -151,7 +151,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -175,7 +175,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -200,14 +200,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass { String process = bundles[0].getProcessData(); LOGGER.info("process: " + Util.prettyPrintXml(process)); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); InstancesResult r = prism.getProcessHelper() .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z"); InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName); + List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper(). getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z"); InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0); @@ -225,14 +225,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); InstancesResult r = prism.getProcessHelper() .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z"); InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); - List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName); + List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z"); TimeUtil.sleepSeconds(TIMEOUT); @@ -250,13 +250,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); prism.getProcessHelper().getProcessInstanceKill(processName, start + "&end=2010-01-02T01:01Z"); - String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0); + String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.KILLED).get(0); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:01Z"); Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID)); @@ -275,11 +275,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setProcessConcurrency(6); bundles[0].submitFeedsScheduleProcess(prism); String process = bundles[0].getProcessData(); - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.RUNNING, + String wfID = InstanceUtil.getWorkflows(clusterOC, processName, Status.RUNNING, Status.SUCCEEDED).get(0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction .Status.SUCCEEDED, EntityType.PROCESS); @@ -300,7 +300,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(2); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -308,7 +308,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { start + "&end=2010-01-02T01:06Z"); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:06Z"); - Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1), + Assert.assertEquals(InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 1), CoordinatorAction.Status.SUSPENDED); } @@ -323,11 +323,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(3); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); - List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName); + List<String> wfIDs = InstanceUtil.getWorkflows(clusterOC, processName); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z&force=true"); InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0); @@ -352,7 +352,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass { CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS); prism.getProcessHelper().getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z"); - s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0); + s = InstanceUtil.getInstanceStatus(clusterOC, processName, 0, 0); Assert.assertEquals(s, CoordinatorAction.Status.WAITING, "instance should have been in WAITING state"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java index 3893ffe..a2ff993 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java @@ -95,7 +95,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceResumeOnlyEnd() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -118,7 +118,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceResumeResumeSome() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -142,7 +142,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceResumeResumeMany() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -166,7 +166,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { public void testProcessInstanceResumeSingle() throws Exception { bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 2); @@ -259,7 +259,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceResumeNonSuspended() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -280,7 +280,7 @@ public class ProcessInstanceResumeTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceResumeLastInstance() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 6, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java index 6003ee0..ee1c5e4 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java @@ -100,7 +100,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass { public void getResumedProcessInstance() throws Exception { bundles[0].setProcessConcurrency(3); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -123,7 +123,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass { public void getSuspendedProcessInstance() throws Exception { bundles[0].setProcessConcurrency(3); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -142,7 +142,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void getRunningProcessInstance() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -183,9 +183,9 @@ public class ProcessInstanceRunningTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void getSucceededProcessInstance() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitForBundleToReachState(cluster, processName, Job.Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, processName, Job.Status.SUCCEEDED); InstancesResult r = prism.getProcessHelper().getRunningInstance(processName); InstanceUtil.validateSuccessWOInstances(r); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java index 635e238..8f177ec 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java @@ -120,11 +120,11 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); @@ -146,7 +146,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessPeriodicity(1, TimeUnit.minutes); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T05:00Z"); AssertUtil.assertSucceeded(r); @@ -164,7 +164,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { HadoopUtil.deleteDirIfExists(baseTestHDFSDir + "/input", clusterFS); bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z"); InstanceUtil.validateResponse(r, 5, 0, 0, 5, 0); @@ -183,12 +183,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:30Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 5, Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); InstanceUtil.validateResponse(r, 6, 5, 0, 1, 0); List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); @@ -206,12 +206,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusStartAndEnd() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1 , Status.RUNNING, EntityType.PROCESS); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); @@ -232,12 +232,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T00:00Z&end=2010-01-02T01:21Z"); InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING); @@ -288,7 +288,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusReverseDateRange() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1, Status.RUNNING, EntityType.PROCESS); @@ -311,12 +311,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessConcurrency(2); bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2, Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z"); InstanceUtil.validateResponse(r, 5, 2, 0, 3, 0); @@ -337,7 +337,7 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); String process = bundles[0].getProcessData(); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.RUNNING, EntityType.PROCESS, 5); @@ -364,12 +364,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { @Test(groups = {"singleCluster"}) public void testProcessInstanceStatusOnlyStart() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, "?start=2010-01-02T01:00Z"); InstanceUtil.validateResponse(r, 5, 1, 0, 4, 0); @@ -404,12 +404,12 @@ public class ProcessInstanceStatusTest extends BaseTestClass { bundles[0].setProcessConcurrency(5); bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].submitFeedsScheduleProcess(prism); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, Status.RUNNING, EntityType.PROCESS, 5); - List<String> oozieWfIDs = OozieUtil.getWorkflow(cluster, bundleId); + List<String> oozieWfIDs = OozieUtil.getWorkflow(clusterOC, bundleId); InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null); InstanceUtil.validateResponse(r, 5, 5, 0, 0, 0); List<String> instanceWfIDs = InstanceUtil.getWorkflowJobIds(r); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java index b7fed18..f673314 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java @@ -86,7 +86,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -111,7 +111,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z"); bundles[0].setProcessConcurrency(1); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); @@ -131,7 +131,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -170,7 +170,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].setProcessConcurrency(3); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); @@ -249,7 +249,7 @@ public class ProcessInstanceSuspendTest extends BaseTestClass { bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z"); bundles[0].setProcessConcurrency(5); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 5, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java index 40a4ad2..aef32bf 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java @@ -91,7 +91,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].submitAndScheduleProcess(); AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 1); @@ -107,7 +107,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].getProcessName(), EntityType.PROCESS); String bundleID = bundleList.get(0); - OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); + OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); } /** @@ -131,7 +131,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].submitAndScheduleProcess(); AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, true, 1); @@ -147,7 +147,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].getProcessName(), EntityType.PROCESS); String bundleID = bundleList.get(0); - OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); + OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); } /** @@ -175,7 +175,7 @@ public class ProcessLateRerunTest extends BaseTestClass { AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 3); @@ -191,7 +191,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].getProcessName(), EntityType.PROCESS); String bundleID = bundleList.get(0); - OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1); + OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 1); } /** @@ -231,7 +231,7 @@ public class ProcessLateRerunTest extends BaseTestClass { AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); TimeUtil.sleepSeconds(10); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); getAndCreateDependencies(cluster1, bundles[0], cluster1OC, cluster1FS, false, 7); @@ -248,7 +248,7 @@ public class ProcessLateRerunTest extends BaseTestClass { bundles[0].getProcessName(), EntityType.PROCESS); String bundleID = bundleList.get(0); - OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0); + OozieUtil.validateRetryAttempts(cluster1OC, bundleID, EntityType.PROCESS, 0); } /* @@ -271,10 +271,10 @@ public class ProcessLateRerunTest extends BaseTestClass { Assert.assertTrue(bundles != null && bundles.size() > 0, "Bundle job not created."); String bundleID = bundles.get(0); LOGGER.info("bundle id: " + bundleID); - List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID); + List<String> missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID); for (int i = 0; i < 10 && missingDependencies == null; ++i) { TimeUtil.sleepSeconds(30); - missingDependencies = OozieUtil.getMissingDependencies(prismHelper, bundleID); + missingDependencies = OozieUtil.getMissingDependencies(oozieClient, bundleID); } Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java index 74903e1..3988ae9 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathLoadTest.java @@ -28,6 +28,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.Job.Status; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.*; @@ -43,8 +44,8 @@ import java.util.Map; @Test(groups = "embedded") public class ProcessLibPathLoadTest extends BaseTestClass { - private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); private FileSystem clusterFS = serverFS.get(0); private String testDir = cleanAndGetTestDir(); private String aggregateWorkflowDir = testDir + "/aggregator"; @@ -100,9 +101,9 @@ public class ProcessLibPathLoadTest extends BaseTestClass { public void setRightJarInWorkflowLib() throws Exception { bundles[0].setProcessWorkflow(aggregateWorkflowDir); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); } /** @@ -112,13 +113,13 @@ public class ProcessLibPathLoadTest extends BaseTestClass { * @throws Exception */ @Test - public void setNoJarInWorkflowLibLocaltion() throws Exception { + public void setNoJarInWorkflowLibLocation() throws Exception { HadoopUtil.deleteDirIfExists(aggregateWorkflowDir + "/lib/" + oozieLibName, clusterFS); bundles[0].setProcessWorkflow(aggregateWorkflowDir); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitForBundleToReachState(cluster, processName, Status.KILLED); + OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.KILLED); } /** @@ -149,7 +150,6 @@ public class ProcessLibPathLoadTest extends BaseTestClass { output.write(buffer, 0, n); } output.close(); - } private static boolean isRedirected(Map<String, List<String>> header) { http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java index bc2978f..dc2fc37 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java @@ -33,6 +33,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.Job.Status; +import org.apache.oozie.client.OozieClient; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -47,6 +48,7 @@ import java.util.List; public class ProcessLibPathTest extends BaseTestClass { private ColoHelper cluster = servers.get(0); + private OozieClient clusterOC = serverOC.get(0); private FileSystem clusterFS = serverFS.get(0); private String testDir = cleanAndGetTestDir(); private String testLibDir = testDir + "/TestLib"; @@ -102,9 +104,9 @@ public class ProcessLibPathTest extends BaseTestClass { bundles[0].setProcessWorkflow(workflowDir); LOGGER.info("processData: " + Util.prettyPrintXml(process)); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); } /** @@ -122,8 +124,8 @@ public class ProcessLibPathTest extends BaseTestClass { bundles[0].setProcessWorkflow(workflowDir); LOGGER.info("processData: " + Util.prettyPrintXml(process)); bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, process, 0); OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); - InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, processName, Status.SUCCEEDED); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java index 3057166..f67cbf8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/TouchAPIPrismAndServerTest.java @@ -25,7 +25,6 @@ import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.response.ServiceResponse; import org.apache.falcon.regression.core.util.*; import org.apache.falcon.regression.testHelper.BaseTestClass; -import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; @@ -44,12 +43,11 @@ import javax.xml.bind.JAXBException; public class TouchAPIPrismAndServerTest extends BaseTestClass { private ColoHelper cluster = servers.get(0); private OozieClient clusterOC = serverOC.get(0); - private FileSystem clusterFS = serverFS.get(0); private String aggregateWorkflowDir = cleanAndGetTestDir() + "/aggregator"; - private String feed; private static final Logger LOGGER = Logger.getLogger(TouchAPIPrismAndServerTest.class); private String startTime; private String endTime; + private String clusterName; @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -68,6 +66,7 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass { bundles[0].setProcessValidity(startTime, endTime); bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); + clusterName = Util.readEntityName(bundles[0].getDataSets().get(0)); } @AfterMethod(alwaysRun = true) @@ -86,22 +85,23 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass { public void touchProcessSchedule() throws Exception { bundles[0].submitFeedsScheduleProcess(prism); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); - String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, - bundles[0].getProcessName(), EntityType.PROCESS); - String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, + bundles[0].getProcessName(), EntityType.PROCESS); + String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), + EntityType.PROCESS); // via prism ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData()); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated."); validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId); // via server oldbundleId = bundleId; - coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData()); - bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated."); validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId); } @@ -117,27 +117,22 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass { public void touchFeedSchedule() throws Exception { bundles[0].submitAndScheduleFeed(); AssertUtil.checkStatus(clusterOC, EntityType.FEED, bundles[0], Job.Status.RUNNING); - String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, - Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED); - String oldbundleId = InstanceUtil.getLatestBundleID(cluster, - Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED); + String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED); + String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED); // via prism TimeUtil.sleepSeconds(60); ServiceResponse response = prism.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0)); - String bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)), - EntityType.FEED); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED); Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated."); validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId); // via server oldbundleId = bundleId; - coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, Util.readEntityName(bundles[0].getDataSets().get(0)), - EntityType.FEED); + coordId = OozieUtil.getLatestCoordinatorID(clusterOC, clusterName, EntityType.FEED); TimeUtil.sleepSeconds(60); response = cluster.getFeedHelper().touchEntity(bundles[0].getDataSets().get(0)); - bundleId = InstanceUtil.getLatestBundleID(cluster, Util.readEntityName(bundles[0].getDataSets().get(0)), - EntityType.FEED); + bundleId = OozieUtil.getLatestBundleID(clusterOC, clusterName, EntityType.FEED); Assert.assertNotEquals(oldbundleId, bundleId, "Bundle ids are same. No new bundle generated."); validate(response, "Old bundle id: " + coordId + ". New bundle id: " + bundleId); @@ -157,21 +152,20 @@ public class TouchAPIPrismAndServerTest extends BaseTestClass { bundles[0].setProcessValidity(startTime, endTime); bundles[0].submitFeedsScheduleProcess(prism); AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); - String coordId = InstanceUtil.getLatestCoordinatorID(clusterOC, - bundles[0].getProcessName(), EntityType.PROCESS); - String oldbundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); + String coordId = OozieUtil.getLatestCoordinatorID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); + String oldbundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); // via prism ServiceResponse response = prism.getProcessHelper().touchEntity(bundles[0].getProcessData()); - String bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + String bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldbundleId, bundleId, "New bundle generated"); validate(response, "Old bundle id: " + coordId); //via server oldbundleId = bundleId; response = cluster.getProcessHelper().touchEntity(bundles[0].getProcessData()); - bundleId = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); + bundleId = OozieUtil.getLatestBundleID(clusterOC, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldbundleId, bundleId, "New bundle generated"); validate(response, "Old bundle id: " + coordId); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java index 9799a1c..67cbc52 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java @@ -32,8 +32,8 @@ import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HCatUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.api.HCatCreateTableDesc; @@ -212,9 +212,7 @@ public class HCatFeedOperationsTest extends BaseTestClass { .build()).toString(); AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1); //This test doesn't wait for replication to succeed. } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java index 53e4777..cd1b538 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java @@ -33,6 +33,7 @@ import org.apache.falcon.regression.core.util.HCatUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; @@ -113,7 +114,6 @@ public class HCatReplicationTest extends BaseTestClass { bundles[2].generateUniqueBundle(this); bundles[2].setClusterInterface(Interfacetype.REGISTRY, cluster3.getClusterHelper() .getHCatEndpoint()); - } @DataProvider @@ -189,9 +189,7 @@ public class HCatReplicationTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); TimeUtil.sleepSeconds(TIMEOUT); //check if all coordinators exist - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1); //replication should start, wait while it ends // we will check for 2 instances so that both partitions are copied over. @@ -206,7 +204,6 @@ public class HCatReplicationTest extends BaseTestClass { .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testHdfsDir)); LOGGER.info("Data on target cluster: " + cluster2ReplicatedData); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); - } // make sure oozie changes mentioned FALCON-389 are done on the clusters. Otherwise the test @@ -285,14 +282,10 @@ public class HCatReplicationTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); TimeUtil.sleepSeconds(TIMEOUT); //check if all coordinators exist - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1); //check if all coordinators exist - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, Util.readEntityName(feed), "REPLICATION"), 1); //replication should start, wait while it ends // we will check for 2 instances so that both partitions are copied over. @@ -318,7 +311,6 @@ public class HCatReplicationTest extends BaseTestClass { AssertUtil.checkForListSizes(srcData, cluster3TargetData); } - private void addPartitionsToTable(List<String> partitions, List<String> partitionLocations, String partitionCol, String databaseName, String tableName, HCatClient hc) throws http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java index 7da8ef1..67b80d8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java @@ -66,7 +66,6 @@ import java.util.List; public class EntitySummaryTest extends BaseTestClass { private static final Logger LOGGER = Logger.getLogger(EntitySummaryTest.class); private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); private OozieClient cluster1OC = serverOC.get(0); private OozieClient cluster2OC = serverOC.get(1); private FileSystem cluster1FS = serverFS.get(0); @@ -112,7 +111,7 @@ public class EntitySummaryTest extends BaseTestClass { bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); String clusterName = Util.readEntityName(bundles[0].getClusters().get(0)); - List<String> processes = scheduleEntityValidateWaitingInstances(cluster1, + List<String> processes = scheduleEntityValidateWaitingInstances(cluster1OC, bundles[0].getProcessData(), EntityType.PROCESS, clusterName); //create data for processes to run and wait some time for instances to make progress @@ -161,8 +160,7 @@ public class EntitySummaryTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def)); //submit and schedule 7 feeds, check that 7 waiting instances are present for each feed - List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2, feed, - EntityType.FEED, clusterName); + List<String> feeds = scheduleEntityValidateWaitingInstances(cluster2OC, feed, EntityType.FEED, clusterName); //create data for processes to run and wait some time for instances to make progress List<String> folders = TimeUtil.getMinuteDatesOnEitherSide(TimeUtil.oozieDateToDate( @@ -180,9 +178,8 @@ public class EntitySummaryTest extends BaseTestClass { * Schedules 7 entities and checks that summary reflects info about the most recent 7 * instances of each of them. */ - private List<String> scheduleEntityValidateWaitingInstances(ColoHelper cluster, String entity, - EntityType entityType, - String clusterName) + private List<String> scheduleEntityValidateWaitingInstances(OozieClient oozieClient, String entity, + EntityType entityType, String clusterName) throws AuthenticationException, IOException, URISyntaxException, JAXBException, OozieClientException, InterruptedException { String entityName = Util.readEntityName(entity); @@ -203,8 +200,8 @@ public class EntitySummaryTest extends BaseTestClass { } entity = entityMerlin.toString(); AssertUtil.assertSucceeded(helper.submitAndSchedule(entity)); - InstanceUtil.waitTillInstancesAreCreated(cluster, entity, 0); - InstanceUtil.waitTillInstanceReachState(cluster.getClusterHelper().getOozieClient(), + InstanceUtil.waitTillInstancesAreCreated(oozieClient, entity, 0); + InstanceUtil.waitTillInstanceReachState(oozieClient, uniqueName, 7, CoordinatorAction.Status.WAITING, entityType); //check that summary shows recent (i) number of feeds and their instances http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java index 64a7e2e..6d41f9e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/LineageApiProcessInstanceTest.java @@ -28,7 +28,6 @@ import org.apache.falcon.regression.core.response.lineage.VerticesResult; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.GraphAssert; import org.apache.falcon.regression.core.util.HadoopUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; @@ -37,6 +36,7 @@ import org.apache.falcon.resource.InstancesResult; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.Job; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -55,6 +55,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass { private ColoHelper cluster = servers.get(0); private FileSystem clusterFS = serverFS.get(0); + private OozieClient clusterOC = serverOC.get(0); private LineageHelper lineageHelper; private String baseTestHDFSDir = cleanAndGetTestDir(); private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator"; @@ -104,7 +105,7 @@ public class LineageApiProcessInstanceTest extends BaseTestClass { outputFeedName = bundles[0].getOutputFeedNameFromBundle(); Job.Status status = null; for (int i = 0; i < 20; i++) { - status = InstanceUtil.getDefaultCoordinatorStatus(cluster, bundles[0].getProcessName(), 0); + status = OozieUtil.getDefaultCoordinatorStatus(clusterOC, bundles[0].getProcessName(), 0); if (status == Job.Status.SUCCEEDED || status == Job.Status.KILLED) { break; } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java index 148d6ea..8ef6bb6 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java @@ -123,7 +123,7 @@ public class ListFeedInstancesTest extends BaseTestClass { //submit and schedule feed AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0); InstanceUtil.waitTillInstanceReachState(cluster2OC, feedName, 12, CoordinatorAction.Status.WAITING, EntityType.FEED); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java index 66d1886..93e9a3e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java @@ -77,7 +77,7 @@ public class ListProcessInstancesTest extends BaseTestClass { bundles[0].setProcessConcurrency(3); bundles[0].submitAndScheduleProcess(); processName = bundles[0].getProcessName(); - InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); //create data for processes to run and wait some time for instances to make progress OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java index a72d339..99f58e6 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/FeedDelayTest.java @@ -130,18 +130,17 @@ public class FeedDelayTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), "REPLICATION"), 1); + InstanceUtil.waitTillInstancesAreCreated(cluster2OC, feed, 0); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "REPLICATION"), 1); //Finding bundleId of replicated instance on target - String bundleId = InstanceUtil.getLatestBundleID(cluster2, Util.readEntityName(feed), EntityType.FEED); + String bundleId = OozieUtil.getLatestBundleID(cluster2OC, Util.readEntityName(feed), EntityType.FEED); //Finding startTime of replicated instance on target - String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2, feed, 0); + String startTimeO0zie = OozieUtil.getCoordStartTime(cluster2OC, feed, 0); String startTimeExpected = getStartTime(sourceStartTime, targetStartTime, new Frequency(sourceDelay), flag); - List<String> missingDep = getAndCreateDependencies(cluster1, cluster1FS, cluster2, bundleId); + List<String> missingDep = getAndCreateDependencies(cluster1FS, cluster1.getPrefix(), cluster2OC, bundleId); List<String> qaDep = new ArrayList<String>(); if (flag) { @@ -176,25 +175,24 @@ public class FeedDelayTest extends BaseTestClass { }; } - private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1, - ColoHelper prismHelper2, String bundleId) throws OozieClientException, IOException { - List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); + private List<String> getAndCreateDependencies(FileSystem sourceFS, String sourcePrefix, OozieClient targetOC, + String bundleId) throws OozieClientException, IOException { + List<String> missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId); for (int i = 0; i < 10 && missingDependencies == null; ++i) { TimeUtil.sleepSeconds(30); LOGGER.info("sleeping..."); - missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId); + missingDependencies = OozieUtil.getMissingDependencies(targetOC, bundleId); } Assert.assertNotNull(missingDependencies, "Missing dependencies not found."); // Creating missing dependencies - HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies); + HadoopUtil.createFolders(sourceFS, sourcePrefix, missingDependencies); //Adding data to empty folders for (String location : missingDependencies) { LOGGER.info("Transferring data to : " + location); - HadoopUtil.copyDataToFolder(clusterFS1, location, OSUtil.RESOURCES + "feed-s4Replication.xml"); + HadoopUtil.copyDataToFolder(sourceFS, location, OSUtil.RESOURCES + "feed-s4Replication.xml"); } - return missingDependencies; }
