http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java index 3f6dc66..a5a89d7 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java @@ -129,13 +129,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); ProcessMerlin updatedProcess = new ProcessMerlin(bundles[1].getProcessObject()); @@ -159,11 +160,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); waitingForBundleFinish(cluster3, oldBundleId, 5); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); } @@ -182,12 +183,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); List<String> oldNominalTimes = - OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); + OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() @@ -201,7 +202,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { bundles[1].setProcessValidity(newStartTime, newEndTime); bundles[1].setProcessConcurrency(10); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); LOGGER.info("updated process: " + Util.prettyPrintXml(bundles[1].getProcessData())); while (Util.parseResponse( @@ -213,7 +214,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(10); } - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); dualComparison(prism, cluster3, bundles[1].getProcessData()); @@ -222,7 +223,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(TimeUtil @@ -240,7 +241,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0) .getValidity().getEnd())); - Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), + Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId), expectedNumberOfWorkflows); } @@ -252,9 +253,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(25); int initialConcurrency = bundles[1].getProcessObject().getParallel(); @@ -285,7 +286,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); while (Util.parseResponse( prism.getProcessHelper() .update(bundles[1].getProcessData(), bundles[1].getProcessData())) @@ -302,7 +303,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); waitingForBundleFinish(cluster3, oldBundleId); int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = @@ -328,13 +329,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); @@ -347,7 +348,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { ServiceResponse response = prism.getProcessHelper().update(bundles[1].getProcessData(), updatedProcess.toString()); AssertUtil.assertSucceeded(response); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); @@ -366,15 +367,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); String originalProcessData = bundles[1].getProcessData(); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); TimeUtil.sleepSeconds(20); List<String> oldNominalTimes = - OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); + OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); bundles[1].setProcessName(this.getClass().getSimpleName() + "-myNewProcessName"); //now to update @@ -382,7 +383,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { prism.getProcessHelper() .update((bundles[1].getProcessData()), bundles[1].getProcessData()); AssertUtil.assertFailed(response); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, originalProcessData, false, false); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -400,18 +401,18 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); //now to update DateTime updateTime = new DateTime(DateTimeZone.UTC); TimeUtil.sleepSeconds(60); List<String> oldNominalTimes = - OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); + OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); LOGGER.info("updating at " + updateTime); while (Util .parseResponse(updateProcessConcurrency(bundles[1], @@ -428,7 +429,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated // correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), false, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); @@ -443,7 +444,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { && status != Job.Status.DONEWITHERROR) { int statusCount = InstanceUtil - .getInstanceCountWithStatus(cluster3, + .getInstanceCountWithStatus(cluster3OC, bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS); @@ -468,7 +469,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { bundles[1].getProcessObject().getClusters().getClusters() .get(0).getValidity() .getEnd())); - Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), + Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId), expectedNumberOfInstances); } @@ -484,11 +485,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( @@ -500,7 +501,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getStart()), newEndTime); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); ServiceResponse response = prism.getProcessHelper() .update(bundles[1].getProcessData(), bundles[1].getProcessData()); @@ -514,12 +515,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.SUCCEEDED, "Process update did not succeed."); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), false, true); int i = 0; - while (OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId) + while (OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId) != getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() .getStart() @@ -540,7 +541,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // one is updated // correctly. int finalNumberOfInstances = InstanceUtil - .getProcessInstanceList(cluster3, + .getProcessInstanceList(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, @@ -567,15 +568,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); AssertUtil.assertSucceeded( @@ -595,7 +596,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), false, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); @@ -610,7 +611,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { && status != Job.Status.DONEWITHERROR) { if (InstanceUtil - .getInstanceCountWithStatus(cluster3, + .getInstanceCountWithStatus(cluster3OC, bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS) @@ -627,7 +628,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { waitingForBundleFinish(cluster3, oldBundleId); int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = @@ -660,11 +661,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); //now to update @@ -707,7 +708,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { && status != Job.Status.DONEWITHERROR) { if (InstanceUtil - .getInstanceCountWithStatus(cluster3, + .getInstanceCountWithStatus(cluster3OC, bundles[1].getProcessName(), org.apache.oozie.client.CoordinatorAction.Status.RUNNING, EntityType.PROCESS) @@ -721,9 +722,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); } Assert.assertTrue(doesExist, "Er! The desired concurrency levels are never reached!!!"); - OozieUtil.verifyNewBundleCreation(cluster3, InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS), + OozieUtil.verifyNewBundleCreation(cluster3OC, OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS), oldNominalTimes, bundles[1].getProcessData(), false, true ); @@ -731,7 +732,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { waitingForBundleFinish(cluster3, oldBundleId); int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = @@ -761,11 +762,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); int initialConcurrency = bundles[1].getProcessObject().getParallel(); @@ -793,11 +794,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. waitingForBundleFinish(cluster3, oldBundleId); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( @@ -826,13 +827,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); int initialConcurrency = bundles[1].getProcessObject().getParallel(); @@ -864,11 +865,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. waitingForBundleFinish(cluster3, oldBundleId); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = @@ -897,13 +898,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(20); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; @@ -921,7 +922,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getStatus() != APIResult.Status.SUCCEEDED) { TimeUtil.sleepSeconds(20); } - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); @@ -931,7 +932,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. waitingForBundleFinish(cluster3, oldBundleId); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -947,14 +948,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); TimeUtil.sleepSeconds(30); - InstanceUtil.waitTillInstancesAreCreated(cluster3, bundles[1].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; @@ -974,7 +975,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(10); } - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); @@ -986,7 +987,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. waitingForBundleFinish(cluster3, oldBundleId); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); @@ -1012,12 +1013,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - Util.readEntityName(originalProcess), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + Util.readEntityName(originalProcess), EntityType.PROCESS); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, originalProcess, 0, 10); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); //submit new feed @@ -1037,12 +1038,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), false, false); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); while (Util.parseResponse( prism.getProcessHelper().update(updatedProcess, updatedProcess)).getStatus() @@ -1053,13 +1054,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { dualComparison(prism, cluster3, bundles[1].getProcessData()); Assert.assertTrue(Util.isDefinitionSame(cluster2, prism, originalProcess)); bundles[1].verifyDependencyListing(cluster2); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, updatedProcess, true, false); waitingForBundleFinish(cluster3, oldBundleId); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); @@ -1074,13 +1075,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( @@ -1098,7 +1099,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { LOGGER.info("update didnt SUCCEED in last attempt"); TimeUtil.sleepSeconds(10); } - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), false, true); bundles[1].verifyDependencyListing(cluster2); @@ -1108,7 +1109,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. int finalNumberOfInstances = InstanceUtil - .getProcessInstanceList(cluster3, + .getProcessInstanceList(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, @@ -1123,7 +1124,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { bundles[1].getProcessObject().getClusters().getClusters().get(0) .getValidity().getStart()), newEndTime); - Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId), + Assert.assertEquals(OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId), expectedNumberOfWorkflows); } @@ -1137,12 +1138,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); String newEndTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() @@ -1171,7 +1172,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. int finalNumberOfInstances = InstanceUtil - .getProcessInstanceList(cluster3, + .getProcessInstanceList(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, @@ -1204,13 +1205,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); List<String> oldNominalTimes = - OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); + OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); @@ -1232,7 +1233,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated // correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -1255,12 +1256,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); LOGGER.info("original process: " + Util.prettyPrintXml(bundles[1].getProcessData())); @@ -1281,7 +1282,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -1296,11 +1297,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(30); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3, oldBundleId, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( @@ -1312,13 +1313,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getEnd() )); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); OozieUtil.createMissingDependencies(cluster3, EntityType.PROCESS, bundles[1].getProcessName(), 0); AssertUtil.assertSucceeded( prism.getProcessHelper() .update(bundles[1].getProcessData(), bundles[1].getProcessData())); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, true); bundles[1].verifyDependencyListing(cluster2); dualComparison(prism, cluster3, bundles[1].getProcessData()); @@ -1331,12 +1332,12 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); - OozieUtil.getNumberOfWorkflowInstances(cluster3, oldBundleId); + OozieUtil.getNumberOfWorkflowInstances(cluster3OC, oldBundleId); String oldStartTime = TimeUtil.dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity() .getStart() @@ -1350,7 +1351,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .getEnd() )); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); AssertUtil.assertSucceeded( cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); @@ -1367,14 +1368,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, bundles[1].getProcessName(), + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS).size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(oldStartTime, bundles[1].getProcessObject().getClusters().getClusters().get(0) .getValidity().getEnd())); Assert.assertEquals(InstanceUtil - .getProcessInstanceList(cluster3, + .getProcessInstanceList(cluster3OC, bundles[1].getProcessName(), EntityType.PROCESS) .size(), getExpectedNumberOfWorkflowInstances(newStartTime, bundles[1].getProcessObject().getClusters().getClusters().get(0).getValidity().getEnd())); @@ -1389,9 +1390,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //now to schedule in 1 colo and let it remain in another AssertUtil.assertSucceeded( cluster3.getProcessHelper().schedule(bundles[1].getProcessData())); - String oldBundleId = InstanceUtil - .getLatestBundleID(cluster3, - bundles[1].getProcessName(), EntityType.PROCESS); + String oldBundleId = OozieUtil + .getLatestBundleID(cluster3OC, + bundles[1].getProcessName(), EntityType.PROCESS); TimeUtil.sleepSeconds(30); String newStartTime = TimeUtil.addMinsToTime(TimeUtil.dateToOozieDate( @@ -1404,7 +1405,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { )); InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 0, 10); - waitForProcessToReachACertainState(cluster3, bundles[1], Job.Status.RUNNING); + waitForProcessToReachACertainState(cluster3OC, bundles[1], Job.Status.RUNNING); AssertUtil.assertSucceeded( cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); @@ -1413,9 +1414,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { .update(bundles[1].getProcessData(), bundles[1].getProcessData())); AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); List<String> oldNominalTimes = - OozieUtil.getActionsNominalTime(cluster3, oldBundleId, EntityType.PROCESS); + OozieUtil.getActionsNominalTime(cluster3OC, oldBundleId, EntityType.PROCESS); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleId, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleId, oldNominalTimes, bundles[1].getProcessData(), true, false); bundles[1].verifyDependencyListing(cluster2); @@ -1448,11 +1449,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { EntityType.PROCESS); //save old data - String oldBundleID = InstanceUtil - .getLatestBundleID(cluster1, - Util.readEntityName(b.getProcessData()), EntityType.PROCESS); + String oldBundleID = OozieUtil + .getLatestBundleID(cluster1OC, + Util.readEntityName(b.getProcessData()), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleID, EntityType.PROCESS); @@ -1467,7 +1468,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(20); //verify new bundle creation - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes, b.getProcessData(), true, true); } finally { @@ -1529,26 +1530,26 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { } - private void waitForProcessToReachACertainState(ColoHelper coloHelper, Bundle bundle, + private void waitForProcessToReachACertainState(OozieClient oozieClient, Bundle bundle, Job.Status state) throws Exception { - while (OozieUtil.getOozieJobStatus(coloHelper.getFeedHelper().getOozieClient(), + while (OozieUtil.getOozieJobStatus(oozieClient, bundle.getProcessName(), EntityType.PROCESS) != state) { //keep waiting TimeUtil.sleepSeconds(10); } //now check if the coordinator is in desired state - CoordinatorJob coord = getDefaultOozieCoord(coloHelper, InstanceUtil - .getLatestBundleID(coloHelper, Util.readEntityName(bundle.getProcessData()), - EntityType.PROCESS)); + CoordinatorJob coord = getDefaultOozieCoord(oozieClient, OozieUtil + .getLatestBundleID(oozieClient, bundle.getProcessName(), + EntityType.PROCESS)); while (coord.getStatus() != state) { TimeUtil.sleepSeconds(10); - coord = getDefaultOozieCoord(coloHelper, InstanceUtil - .getLatestBundleID(coloHelper, bundle.getProcessName(), - EntityType.PROCESS)); + coord = getDefaultOozieCoord(oozieClient, OozieUtil + .getLatestBundleID(oozieClient, bundle.getProcessName(), + EntityType.PROCESS)); } } @@ -1630,14 +1631,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { waitingForBundleFinish(coloHelper, bundleId, 15); } - private CoordinatorJob getDefaultOozieCoord(ColoHelper coloHelper, String bundleId) - throws Exception { - OozieClient client = coloHelper.getFeedHelper().getOozieClient(); - BundleJob bundlejob = client.getBundleJobInfo(bundleId); - + private CoordinatorJob getDefaultOozieCoord(OozieClient oozieClient, String bundleId) throws Exception { + BundleJob bundlejob = oozieClient.getBundleJobInfo(bundleId); for (CoordinatorJob coord : bundlejob.getCoordinators()) { if (coord.getAppName().contains("DEFAULT")) { - return client.getCoordJobInfo(coord.getId()); + return oozieClient.getCoordJobInfo(coord.getId()); } } return null;
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java index 39f0268..944c67f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java @@ -284,7 +284,7 @@ public class OptionalInputTest extends BaseTestClass { prism.getProcessHelper().update(process.toString(), process.toString()); //from now on ... it should wait of input0 also - InstanceUtil.waitTillInstancesAreCreated(cluster, process.toString(), 0); + InstanceUtil.waitTillInstancesAreCreated(serverOC.get(0), process.toString(), 0); InstanceUtil.waitTillInstanceReachState(oozieClient, processName, 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS, 10); HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java index 4221525..1bc4027 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java @@ -28,11 +28,13 @@ import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -51,6 +53,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); private ColoHelper cluster2 = servers.get(1); private ColoHelper cluster3 = servers.get(2); + private OozieClient cluster1OC = serverOC.get(0); private FileSystem cluster1FS = serverFS.get(0); private FileSystem cluster2FS = serverFS.get(1); private FileSystem cluster3FS = serverFS.get(2); @@ -124,23 +127,22 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { .withPartition("UK/${cluster.colo}") .build()).toString(); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); String bundleId = - InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED); + OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); //wait till 1st instance of replication coord is SUCCEEDED - List<String> replicationCoordIDTarget = InstanceUtil + List<String> replicationCoordIDTarget = OozieUtil .getReplicationCoordID(bundleId, cluster1.getFeedHelper()); for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; @@ -151,10 +153,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(15); List<String> inputFolderListForColo1 = - InstanceUtil.getInputFoldersForInstanceForReplication(cluster1, + InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = - InstanceUtil.getInputFoldersForInstanceForReplication(cluster1, + InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1); HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT, @@ -216,16 +218,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(60); //wait till 1st instance of replication coord is SUCCEEDED - String bundleId = InstanceUtil - .getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED); - - List<String> replicationCoordIDTarget = InstanceUtil.getReplicationCoordID(bundleId, + String bundleId = OozieUtil + .getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); + List<String> replicationCoordIDTarget = OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper()); for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; @@ -234,19 +235,19 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(20); } - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED); - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED); TimeUtil.sleepSeconds(15); List<String> inputFolderListForColo1 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1); + .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1); + .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1); HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT, inputFolderListForColo1); @@ -258,17 +259,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 1 Assert.assertEquals( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0), + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), 1, "id has to be equal 1"); Assert.assertEquals( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0), + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), 1, "id has to be equal 1"); //wait for lates run to complete for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; @@ -276,10 +277,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED); - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED); @@ -296,10 +297,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 2 Assert.assertEquals( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0), + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), 2, "id has to be equal 2"); Assert.assertEquals( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0), + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), 2, "id has to be equal 2"); } @@ -381,15 +382,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //wait till 1st instance of replication coord is SUCCEEDED String bundleId = - InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED); + OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); List<String> replicationCoordIDTarget = - InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper()); + OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper()); for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; @@ -398,10 +399,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(20); } - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, "Replication job should have succeeded."); - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, "Replication job should have succeeded."); @@ -411,14 +412,14 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { // be present. both of them should have _success List<String> inputFolderListForColo1 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1); + .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1); + .getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1); String outPutLocation = InstanceUtil - .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 0); + .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0); String outPutBaseLocation = InstanceUtil - .getOutputFolderBaseForInstanceForReplication(cluster1, + .getOutputFolderBaseForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0); List<String> subFolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation); @@ -439,17 +440,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 1 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == 1 - && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), + && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == 1, "id have to be equal 1"); //wait for latest run to complete for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } @@ -467,9 +468,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9)); //check for run id to be 2 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == 2 - && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), + && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == 2, "id have to be equal 2"); } @@ -571,15 +572,15 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //wait till 1st instance of replication coord is SUCCEEDED String bundleId = - InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED); + OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); List<String> replicationCoordIDTarget = - InstanceUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper()); + OozieUtil.getReplicationCoordID(bundleId, cluster1.getFeedHelper()); for (int i = 0; i < 30; i++) { - if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), + if (InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == WorkflowJob.Status.SUCCEEDED - && InstanceUtil.getInstanceStatusFromCoord(cluster1, + && InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } @@ -588,10 +589,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(20); } - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, "Replication job did not succeed"); - Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, + Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, "Replication job did not succeed"); @@ -601,17 +602,17 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { be present. both of them should have _success */ List<String> inputFolderListForColo1 = - InstanceUtil.getInputFoldersForInstanceForReplication(cluster1, + InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = - InstanceUtil.getInputFoldersForInstanceForReplication(cluster1, + InstanceUtil.getInputFoldersForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(1), 1); String outPutLocation = InstanceUtil - .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), + .getOutputFolderForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0); String outPutBaseLocation = InstanceUtil - .getOutputFolderBaseForInstanceForReplication(cluster1, + .getOutputFolderBaseForInstanceForReplication(cluster1OC, replicationCoordIDTarget.get(0), 0); List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation); @@ -634,9 +635,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 1 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(0), 0) == 1 - && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), + && InstanceUtil.getInstanceRunIdFromCoord(cluster1OC, replicationCoordIDTarget.get(1), 0) == 1, "id have to be equal 1"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java index c6f72cc..43aafdf 100755 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java @@ -23,13 +23,13 @@ import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.ActionType; import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.response.ServiceResponse; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; @@ -54,14 +54,12 @@ import java.util.List; @Test(groups = "distributed") public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { - private ColoHelper cluster1 = servers.get(0); - private ColoHelper cluster2 = servers.get(1); - private ColoHelper cluster3 = servers.get(2); private FileSystem cluster1FS = serverFS.get(0); private FileSystem cluster2FS = serverFS.get(1); private FileSystem cluster3FS = serverFS.get(2); private OozieClient cluster1OC = serverOC.get(0); private OozieClient cluster2OC = serverOC.get(1); + private OozieClient cluster3OC = serverOC.get(2); private String testDate = "/2012/10/01/12/"; private String baseTestDir = cleanAndGetTestDir(); private String testBaseDir1 = baseTestDir + "/localDC/rc/billing"; @@ -83,7 +81,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { // pt : partition in target // ps: partition in source - private void uploadDataToServer3(String location, String fileName) throws IOException { HadoopUtil.recreateDir(cluster3FS, location); HadoopUtil.copyDataToFolder(cluster3FS, location, fileName); @@ -96,9 +93,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @BeforeClass(alwaysRun = true) public void createTestData() throws Exception { - LOGGER.info("creating test data"); - uploadDataToServer3(testDirWithDate + "00/ua2/", testFile1); uploadDataToServer3(testDirWithDate + "05/ua2/", testFile2); uploadDataToServer3(testDirWithDate + "10/ua2/", testFile3); @@ -123,23 +118,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { uploadDataToServer3(testBaseDir3 + testDate + "15/ua2/", testFile4); uploadDataToServer3(testBaseDir3 + testDate + "20/ua2/", testFile4); - uploadDataToServer3(testBaseDir3 + testDate + "00/ua1/", testFile1); uploadDataToServer3(testBaseDir3 + testDate + "05/ua1/", testFile2); uploadDataToServer3(testBaseDir3 + testDate + "10/ua1/", testFile3); uploadDataToServer3(testBaseDir3 + testDate + "15/ua1/", testFile4); uploadDataToServer3(testBaseDir3 + testDate + "20/ua1/", testFile4); - uploadDataToServer3(testBaseDir3 + testDate + "00/ua3/", testFile1); uploadDataToServer3(testBaseDir3 + testDate + "05/ua3/", testFile2); uploadDataToServer3(testBaseDir3 + testDate + "10/ua3/", testFile3); uploadDataToServer3(testBaseDir3 + testDate + "15/ua3/", testFile4); uploadDataToServer3(testBaseDir3 + testDate + "20/ua3/", testFile4); - //data for test normalTest_1s2t_pst where both source target partition are required - uploadDataToServer3(testDirWithDateSourceTarget + "00/ua3/ua2/", testFile1); uploadDataToServer3(testDirWithDateSourceTarget + "05/ua3/ua2/", testFile2); uploadDataToServer3(testDirWithDateSourceTarget + "10/ua3/ua2/", testFile3); @@ -156,22 +147,17 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { uploadDataToServer1(testDirWithDateSource1 + "00/ua2/", testFile1); uploadDataToServer1(testDirWithDateSource1 + "05/ua2/", testFile2); - uploadDataToServer1(testDirWithDateSource1 + "00/ua1/", testFile1); uploadDataToServer1(testDirWithDateSource1 + "05/ua1/", testFile2); - uploadDataToServer1(testDirWithDateSource1 + "00/ua3/", testFile1); uploadDataToServer1(testDirWithDateSource1 + "05/ua3/", testFile2); - LOGGER.info("completed creating test data"); - } @BeforeMethod(alwaysRun = true) public void setup() throws Exception { Bundle bundle = BundleUtil.readFeedReplicationBundle(); - for (int i = 0; i < 3; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); bundles[i].generateUniqueBundle(this); @@ -194,9 +180,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { // replication takes // place normally //partition is left blank - Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); - String startTimeUA1 = "2012-10-01T12:05Z"; String startTimeUA2 = "2012-10-01T12:10Z"; @@ -234,8 +218,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); - AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source " - + "is blank"); + AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source is blank"); } @@ -290,42 +273,26 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { r = prism.getFeedHelper().schedule(feed.toString()); AssertUtil.assertSucceeded(r); TimeUtil.sleepSeconds(15); - HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "00/ua3/"); HadoopUtil.recreateDir(cluster3FS, testDirWithDate + "05/ua3/"); - HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/", - testFile1); - HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/", - testFile2); + HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "00/ua3/", testFile1); + HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/", testFile2); InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), - "REPLICATION"), 1); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1); //check if data has been replicated correctly - //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua2ReplicatedData = HadoopUtil .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua2"); - List<Path> ua3ReplicatedData00 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua3/")); List<Path> ua3ReplicatedData05 = HadoopUtil @@ -388,31 +355,18 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), - "REPLICATION"), 1); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(), - "RETENTION"), 1); - + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 1); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 1); //check if data has been replicated correctly - //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua2ReplicatedData = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3"); - List<Path> ua3ReplicatedData00 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "00/ua2/")); List<Path> ua3ReplicatedData05 = HadoopUtil @@ -439,7 +393,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { // (00 to 30) //data should be replicated to folder on cluster1 and cluster2 as targets //ua3 is the source and ua1 and ua2 are target - Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String startTimeUA1 = "2012-10-01T12:05Z"; String startTimeUA2 = "2012-10-01T12:10Z"; @@ -493,19 +446,16 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua1ReplicatedData = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate)); + //check for no ua2 or ua3 in ua1 AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2", "ua3"); List<Path> ua2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, - new Path(testBaseDir3 + testDate)); + .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir3 + testDate)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1", "ua3"); - List<Path> ua1ReplicatedData00 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir3 + testDate + "00/")); List<Path> ua1ReplicatedData10 = HadoopUtil @@ -539,7 +489,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //cluster2 is the target // Since there is no partition expression in source clusters, the feed submission should // fail (FALCON-305). - Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String startTimeUA1 = "2012-10-01T12:05Z"; @@ -650,18 +599,15 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua1ReplicatedData = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate)); + //check for no ua2 or ua3 in ua1 AssertUtil.failIfStringFoundInPath(ua1ReplicatedData, "ua2"); - List<Path> ua2ReplicatedData = HadoopUtil .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1"); - List<Path> ua1ReplicatedData05 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate + "05/")); @@ -687,7 +633,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1); AssertUtil.checkForListSizes(ua2ReplicatedData10, ua3OriginalData10ua2); AssertUtil.checkForListSizes(ua2ReplicatedData15, ua3OriginalData15ua2); - } @@ -702,7 +647,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //data should be replicated to cluster2 from ua2 sub dir of cluster3 and cluster1 // source cluster path in cluster1 should be mentioned in cluster definition // path for data in target cluster should also be customized - Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String startTimeUA1 = "2012-10-01T12:00Z"; @@ -754,8 +698,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua2ReplicatedData = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + "/replicated" + testDate)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua2"); @@ -765,7 +707,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { List<Path> ua2ReplicatedData05ua3 = HadoopUtil.getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir2 + "/replicated" + testDate + "05/ua3/")); - List<Path> ua1OriginalData00 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path( testBaseDirServer1Source + testDate + "00/ua1")); @@ -776,11 +717,8 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { AssertUtil.checkForListSizes(ua2ReplicatedData05ua3, ua3OriginalData05); } - @Test(enabled = true) public void normalTest1Source2TargetPartitionedSourceTarget() throws Exception { - - //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -843,8 +781,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //on ua1 only ua1 should be replicated, ua2 only ua2 //number of files should be same as source - - List<Path> ua1ReplicatedData = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate)); //check for no ua2 in ua1 @@ -854,7 +790,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate)); AssertUtil.failIfStringFoundInPath(ua2ReplicatedData, "ua1"); - List<Path> ua1ReplicatedData05 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir1 + "/ua1" + testDate + "05/")); @@ -867,7 +802,6 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { List<Path> ua2ReplicatedData15 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster2FS, new Path(testBaseDir1 + "/ua2" + testDate + "15")); - List<Path> ua3OriginalData05ua1 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster3FS, new Path( testDirWithDateSourceTarget + "05/ua3/ua1"));
