Repository: falcon Updated Branches: refs/heads/master 9d5429a41 -> d0c9850e5
http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java index 6f60bb8..c7d9d2d 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java @@ -30,12 +30,14 @@ import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; import org.apache.oozie.client.CoordinatorAction.Status; +import org.apache.oozie.client.OozieClient; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -53,6 +55,9 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); private ColoHelper cluster2 = servers.get(1); private ColoHelper cluster3 = servers.get(2); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private OozieClient cluster3OC = serverOC.get(2); private FileSystem cluster1FS = serverFS.get(0); private FileSystem cluster2FS = serverFS.get(1); private FileSystem cluster3FS = serverFS.get(2); @@ -156,25 +161,12 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { //update feed AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString())); - - Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), - Util.readEntityName(feed.toString()), - "REPLICATION"), 0); - Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), - Util.readEntityName(feed.toString()), - "RETENTION"), 2); - Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), - Util.readEntityName(feed.toString()), - "REPLICATION"), 0); - Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), - Util.readEntityName(feed.toString()), - "RETENTION"), 2); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()), - "REPLICATION"), 4); - Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()), - "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "REPLICATION"), 0); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, feed.getName(), "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "REPLICATION"), 0); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, feed.getName(), "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "REPLICATION"), 4); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, feed.getName(), "RETENTION"), 2); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java index d855e33..cbd704f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java @@ -220,11 +220,11 @@ public class PrismFeedUpdateTest extends BaseTestClass { bundles[0].submitAndScheduleAllFeeds(); bundles[0].submitAndScheduleProcess(); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(), 0, 0); - InstanceUtil.waitForBundleToReachState(cluster1, bundles[0].getProcessName(), + OozieUtil.waitForBundleToReachState(cluster1OC, bundles[0].getProcessName(), Job.Status.SUCCEEDED, 20); FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); @@ -250,7 +250,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { bundles[0].submitAndScheduleAllFeeds(); bundles[0].submitAndScheduleProcess(); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(), 0, 0); http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java index 4555221..c5bfdf1 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java @@ -333,21 +333,21 @@ public class PrismProcessScheduleTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(), 0); InstanceUtil.waitTillInstanceReachState(cluster1OC, bundles[0].getProcessName(), 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); - InstanceUtil.waitForBundleToReachState(cluster1, - Util.readEntityName(process), Job.Status.KILLED); - String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, + OozieUtil.waitForBundleToReachState(cluster1OC, + Util.readEntityName(process), Job.Status.KILLED); + String oldBundleID = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(process), EntityType.PROCESS); prism.getProcessHelper().delete(process); bundles[0].submitAndScheduleProcess(); - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, new ArrayList<String>(), process, true, false); } finally { if (hadoopFileEditor != null) { http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java index 8d7ac7f..3f7f117 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java @@ -28,6 +28,7 @@ import org.apache.falcon.regression.core.util.BundleUtil; import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; @@ -112,16 +113,13 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass { */ @Test(enabled = true) public void rescheduleSucceeded() throws Exception { - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); prism.getProcessHelper().delete(bundles[0].getProcessData()); checkNotFoundDefinition(bundles[0].getProcessData()); //submit and schedule process again - AssertUtil.assertSucceeded(prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData())); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData())); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); } /** @@ -134,16 +132,13 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass { */ @Test(enabled = false) public void rescheduleFailed() throws Exception { - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); prism.getProcessHelper().delete(bundles[0].getProcessData()); checkNotFoundDefinition(bundles[0].getProcessData()); //submit and schedule process again - AssertUtil.assertSucceeded(prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData())); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData())); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); } /** @@ -157,21 +152,17 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass { public void rescheduleDWE() throws Exception { InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); - prism.getProcessHelper() - .getProcessInstanceKill(bundles[0].getProcessName(), + prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), "?start=2010-01-02T01:05Z&end=2010-01-02T01:11Z"); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.DONEWITHERROR); //delete the process prism.getProcessHelper().delete(bundles[0].getProcessData()); checkNotFoundDefinition(bundles[0].getProcessData()); //submit and schedule process again - AssertUtil.assertSucceeded(prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData())); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData())); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); } /** @@ -181,15 +172,12 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass { @Test(enabled = true) public void rescheduleKilled() throws Exception { prism.getProcessHelper().delete(bundles[0].getProcessData()); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.KILLED); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.KILLED); checkNotFoundDefinition(bundles[0].getProcessData()); //submit and schedule process again - AssertUtil.assertSucceeded(prism.getProcessHelper() - .submitAndSchedule(bundles[0].getProcessData())); - InstanceUtil - .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData())); + OozieUtil.waitForBundleToReachState(clusterOC, bundles[0].getProcessName(), Status.SUCCEEDED); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java index 99e240d..39b7d37 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java @@ -221,7 +221,7 @@ public class RetentionTest extends BaseTestClass { private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages, List<String> missingData) throws OozieClientException, JMSException { //just verify that each element in queue is same as deleted data! - List<String> workflowIds = OozieUtil.getWorkflowJobs(cluster, + List<String> workflowIds = OozieUtil.getWorkflowJobs(clusterOC, OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0)); //create queue data folderList: http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java index 14d76af..9ff8016 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java @@ -40,6 +40,7 @@ import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.custommonkey.xmlunit.Diff; import org.custommonkey.xmlunit.XMLUnit; @@ -68,6 +69,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { private ColoHelper cluster1 = servers.get(0); private ColoHelper cluster2 = servers.get(1); private ColoHelper cluster3 = servers.get(2); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private OozieClient cluster3OC = serverOC.get(2); private FileSystem cluster2FS = serverFS.get(1); private final String baseTestDir = cleanAndGetTestDir(); private String aggregateWorkflowDir = baseTestDir + "/aggregator"; @@ -106,7 +110,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(0), TimeUtil.getTimeWrtSystemTime(20)); processBundle.submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, processBundle.getProcessData(), 0); String oldProcess = processBundle.getProcessData(); processBundle.setProcessValidity(TimeUtil.getTimeWrtSystemTime(5), @@ -123,7 +127,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { OozieClientException, InterruptedException { String feed = submitAndScheduleFeed(processBundle); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 0); //update frequency FeedMerlin updatedFeed = new FeedMerlin(feed); @@ -144,11 +148,11 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //get old process details String oldProcess = processBundle.getProcessData(); - String oldBundleId = InstanceUtil.getLatestBundleID(cluster1, + String oldBundleId = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); - InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 0); - List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster1, + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, oldProcess, 0); + List<String> initialNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleId, EntityType.PROCESS); // update process by adding property @@ -158,10 +162,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); //check new coord created with current time - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleId, initialNominalTimes, processBundle.getProcessData(), true, false); - InstanceUtil.waitTillInstancesAreCreated(cluster1, oldProcess, 1); - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleId, initialNominalTimes, + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, oldProcess, 1); + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleId, initialNominalTimes, processBundle.getProcessData(), true, true); } @@ -179,7 +183,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 0); //update frequency FeedMerlin updatedFeed = new FeedMerlin(feed); @@ -187,22 +191,13 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { r = prism.getFeedHelper().update(feed, updatedFeed.toString(), TimeUtil.getTimeWrtSystemTime(-10000), null); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 1); //check correct number of coord exists or not - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster1.getFeedHelper(), - Util.readEntityName(feed), - "REPLICATION"), 2); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "RETENTION"), 2); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), - "RETENTION"), 2); - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), - "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, updatedFeed.getName(), "REPLICATION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster2OC, updatedFeed.getName(), "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster1OC, updatedFeed.getName(), "RETENTION"), 2); + Assert.assertEquals(OozieUtil.checkIfFeedCoordExist(cluster3OC, updatedFeed.getName(), "RETENTION"), 2); } @Test(groups = {"MultiCluster", "0.3.1", "distributed"}, timeOut = 1200000, enabled = true) @@ -232,22 +227,22 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //schedule of 2 cluster cluster1.getProcessHelper().schedule(processBundle.getProcessData()); cluster2.getProcessHelper().schedule(processBundle.getProcessData()); - InstanceUtil.waitTillInstancesAreCreated(cluster2, processBundle.getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2OC, processBundle.getProcessData(), 0); //shut down cluster2 Util.shutDownService(cluster2.getProcessHelper()); // save old data before update String oldProcess = processBundle.getProcessData(); - String oldBundleIDCluster1 = InstanceUtil - .getLatestBundleID(cluster1, + String oldBundleIDCluster1 = OozieUtil + .getLatestBundleID(cluster1OC, Util.readEntityName(oldProcess), EntityType.PROCESS); - String oldBundleIDCluster2 = InstanceUtil - .getLatestBundleID(cluster2, + String oldBundleIDCluster2 = OozieUtil + .getLatestBundleID(cluster2OC, Util.readEntityName(oldProcess), EntityType.PROCESS); - List<String> oldNominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1, + List<String> oldNominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleIDCluster1, EntityType.PROCESS); - List<String> oldNominalTimesCluster2 = OozieUtil.getActionsNominalTime(cluster2, + List<String> oldNominalTimesCluster2 = OozieUtil.getActionsNominalTime(cluster2OC, oldBundleIDCluster2, EntityType.PROCESS); //update process validity @@ -258,13 +253,13 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { ServiceResponse r = prism.getProcessHelper() .update(oldProcess, processBundle.getProcessData(), updateTime); AssertUtil.assertPartial(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, processBundle.getProcessData(), 1); //verify new bundle on cluster1 and definition on cluster3 OozieUtil - .verifyNewBundleCreation(cluster1, oldBundleIDCluster1, oldNominalTimesCluster1, + .verifyNewBundleCreation(cluster1OC, oldBundleIDCluster1, oldNominalTimesCluster1, oldProcess, true, false); - OozieUtil.verifyNewBundleCreation(cluster2, oldBundleIDCluster2, + OozieUtil.verifyNewBundleCreation(cluster2OC, oldBundleIDCluster2, oldNominalTimesCluster2, oldProcess, false, false); String definitionCluster3 = Util.getEntityDefinition(cluster3, @@ -275,7 +270,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //start the stopped cluster2 Util.startService(cluster2.getProcessHelper()); TimeUtil.sleepSeconds(40); - String newBundleIdCluster1 = InstanceUtil.getLatestBundleID(cluster1, + String newBundleIdCluster1 = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(oldProcess), EntityType.PROCESS); //send second update request @@ -287,16 +282,16 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { LOGGER.info("defCluster2 : " + Util.prettyPrintXml(defCluster2)); // verify new bundle in cluster2 and no new bundle in cluster1 and - OozieUtil.verifyNewBundleCreation(cluster1, newBundleIdCluster1, + OozieUtil.verifyNewBundleCreation(cluster1OC, newBundleIdCluster1, oldNominalTimesCluster1, oldProcess, false, false); - OozieUtil.verifyNewBundleCreation(cluster2, oldBundleIDCluster2, + OozieUtil.verifyNewBundleCreation(cluster2OC, oldBundleIDCluster2, oldNominalTimesCluster2, oldProcess, true, false); //wait till update time is reached TimeUtil.sleepTill(updateTime); - OozieUtil.verifyNewBundleCreation(cluster2, oldBundleIDCluster2, + OozieUtil.verifyNewBundleCreation(cluster2OC, oldBundleIDCluster2, oldNominalTimesCluster2, oldProcess, true, true); - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleIDCluster1, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleIDCluster1, oldNominalTimesCluster1, oldProcess, true, true); } finally { Util.restartService(cluster2.getProcessHelper()); @@ -321,7 +316,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); r = cluster2.getFeedHelper().schedule(feed); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 0); //shutdown cluster2 Util.shutDownService(cluster2.getProcessHelper()); @@ -330,9 +325,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { FeedMerlin updatedFeed = new FeedMerlin(feed).setFeedProperty("someProp", "someVal"); //save old data - String oldBundleCluster1 = InstanceUtil.getLatestBundleID(cluster1, + String oldBundleCluster1 = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); - List<String> oldNominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1, + List<String> oldNominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleCluster1, EntityType.FEED); //send update command with +5 mins in future @@ -341,7 +336,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertPartial(r); //verify new bundle creation on cluster1 and new definition on cluster3 - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleCluster1, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleCluster1, oldNominalTimesCluster1, feed, true, false); String definition = Util.getEntityDefinition(cluster3, feed, true); Diff diff = XMLUnit.compareXML(definition, processBundle.getProcessData()); @@ -349,7 +344,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //start stopped cluster2 Util.startService(cluster2.getProcessHelper()); - String newBundleCluster1 = InstanceUtil.getLatestBundleID(cluster1, + String newBundleCluster1 = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); //send update again @@ -357,17 +352,16 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); //verify new bundle creation on cluster2 and no new bundle on cluster1 - Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "RETENTION"), 2); - OozieUtil.verifyNewBundleCreation(cluster1, newBundleCluster1, + Assert.assertEquals(OozieUtil + .checkIfFeedCoordExist(cluster2OC, Util.readEntityName(feed), "RETENTION"), 2); + OozieUtil.verifyNewBundleCreation(cluster1OC, newBundleCluster1, oldNominalTimesCluster1, feed, false, false); //wait till update time is reached TimeUtil.sleepTill(TimeUtil.getTimeWrtSystemTime(5)); //verify new bundle creation with instance matching - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleCluster1, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleCluster1, oldNominalTimesCluster1, feed, true, true); } finally { Util.restartService(cluster2.getProcessHelper()); @@ -392,11 +386,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //save old data String oldProcess = processBundle.getProcessData(); - String oldBundleID = InstanceUtil - .getLatestBundleID(cluster1, - Util.readEntityName(oldProcess), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID, - EntityType.PROCESS); + String oldBundleID = OozieUtil.getLatestBundleID(cluster1OC, + Util.readEntityName(oldProcess), EntityType.PROCESS); + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleID, EntityType.PROCESS); //update processBundle.setProcessProperty("someProp", "someVal"); @@ -409,10 +401,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); //verify new bundle creation with instances matching - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes, oldProcess, true, false); - InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, processBundle.getProcessData(), 1); + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes, oldProcess, true, true); } @@ -440,10 +432,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); r = prism.getFeedHelper().submitAndSchedule(feed); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 0); //save old data - String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, + String oldBundleID = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(feed), EntityType.FEED); String updateTime = TimeUtil.addMinsToTime(endTime, 60); FeedMerlin updatedFeed = new FeedMerlin(feed).setFeedProperty("someProp", "someVal"); @@ -452,10 +444,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { LOGGER.info("Update Time : " + updateTime); r = prism.getFeedHelper().update(feed, updatedFeed.toString(), updateTime, null); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, feed, 1); //verify new bundle creation - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, null, feed, true, false); + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, null, feed, true, false); } @Test(groups = {"multiCluster", "0.3.1", "embedded"}, timeOut = 1200000, enabled = true) @@ -472,9 +464,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //save old data String oldProcess = processBundle.getProcessData(); - String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, + String oldBundleID = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(oldProcess), EntityType.PROCESS); - List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1, oldBundleID, + List<String> oldNominalTimes = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleID, EntityType.PROCESS); processBundle.setProcessValidity(TimeUtil.addMinsToTime(startTime, -4), endTime); String updateTime = TimeUtil.getTimeWrtSystemTime(2); @@ -484,7 +476,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { TimeUtil.sleepSeconds(10); //verify new bundle creation - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID, oldNominalTimes, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleID, oldNominalTimes, oldProcess, true, false); } @@ -512,19 +504,19 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { processBundle.submitFeedsScheduleProcess(prism); //wait for coord to be in running state - InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0); - InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, processBundle.getProcessData(), 0); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, processBundle.getProcessData(), 0); //save old info - String oldBundleIdCluster1 = InstanceUtil.getLatestBundleID(cluster1, + String oldBundleIdCluster1 = OozieUtil.getLatestBundleID(cluster1OC, Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); - List<String> nominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1, + List<String> nominalTimesCluster1 = OozieUtil.getActionsNominalTime(cluster1OC, oldBundleIdCluster1, EntityType.PROCESS); - String oldBundleIdCluster2 = InstanceUtil.getLatestBundleID(cluster2, + String oldBundleIdCluster2 = OozieUtil.getLatestBundleID(cluster2OC, Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); - String oldBundleIdCluster3 = InstanceUtil.getLatestBundleID(cluster3, + String oldBundleIdCluster3 = OozieUtil.getLatestBundleID(cluster3OC, Util.readEntityName(processBundle.getProcessData()), EntityType.PROCESS); - List<String> nominalTimesCluster3 = OozieUtil.getActionsNominalTime(cluster3, + List<String> nominalTimesCluster3 = OozieUtil.getActionsNominalTime(cluster3OC, oldBundleIdCluster3, EntityType.PROCESS); //update process @@ -535,22 +527,22 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { AssertUtil.assertSucceeded(r); //check for new bundle to be created - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleIdCluster1, + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleIdCluster1, nominalTimesCluster1, processBundle.getProcessData(), true, false); - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleIdCluster3, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleIdCluster3, nominalTimesCluster3, processBundle.getProcessData(), true, false); - OozieUtil.verifyNewBundleCreation(cluster2, oldBundleIdCluster2, + OozieUtil.verifyNewBundleCreation(cluster2OC, oldBundleIdCluster2, nominalTimesCluster3, processBundle.getProcessData(), true, false); //wait till new coord are running on cluster1 - InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 1); - OozieUtil.verifyNewBundleCreation(cluster1, oldBundleIdCluster1, + InstanceUtil.waitTillInstancesAreCreated(cluster1OC, processBundle.getProcessData(), 1); + OozieUtil.verifyNewBundleCreation(cluster1OC, oldBundleIdCluster1, nominalTimesCluster1, processBundle.getProcessData(), true, true); //verify - String coordStartTimeCluster3 = OozieUtil.getCoordStartTime(cluster3, + String coordStartTimeCluster3 = OozieUtil.getCoordStartTime(cluster3OC, processBundle.getProcessData(), 1); - String coordStartTimeCluster2 = OozieUtil.getCoordStartTime(cluster2, + String coordStartTimeCluster2 = OozieUtil.getCoordStartTime(cluster2OC, processBundle.getProcessData(), 1); DateTime updateTimeOozie = TimeUtil.oozieDateToDate(updateTime); @@ -561,10 +553,10 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { TimeUtil.oozieDateToDate(coordStartTimeCluster2).isEqual(updateTimeOozie), "new coord start time is not correct"); TimeUtil.sleepTill(updateTime); - InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 1); + InstanceUtil.waitTillInstancesAreCreated(cluster3OC, processBundle.getProcessData(), 1); //verify that no instance are missing - OozieUtil.verifyNewBundleCreation(cluster3, oldBundleIdCluster3, + OozieUtil.verifyNewBundleCreation(cluster3OC, oldBundleIdCluster3, nominalTimesCluster3, processBundle.getProcessData(), true, true); } http://git-wip-us.apache.org/repos/asf/falcon/blob/d0c9850e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java index 040057e..8e81dcd 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java @@ -192,16 +192,16 @@ public class ProcessUITest extends BaseUITestClass { ProcessPage processPage = new ProcessPage(getDriver(), cluster, processName); processPage.navigateTo(); - String bundleID = InstanceUtil.getLatestBundleID(cluster, processName, EntityType.PROCESS); - Map<Date, CoordinatorAction.Status> actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID, - EntityType.PROCESS); + String bundleID = OozieUtil.getLatestBundleID(clusterOC, processName, EntityType.PROCESS); + Map<Date, CoordinatorAction.Status> actions = OozieUtil.getActionsNominalTimeAndStatus( + clusterOC, bundleID, EntityType.PROCESS); checkActions(actions, processPage); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); processPage.refresh(); - actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID, EntityType.PROCESS); + actions = OozieUtil.getActionsNominalTimeAndStatus(clusterOC, bundleID, EntityType.PROCESS); checkActions(actions, processPage); softAssert.assertAll();
