Repository: falcon Updated Branches: refs/heads/master d718ad737 -> 5b5113d19
http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java index 3a5d71b..4466c13 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java @@ -22,6 +22,7 @@ import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.entity.v0.feed.ClusterType; import org.apache.falcon.entity.v0.process.ExecutionType; +import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; @@ -154,8 +155,8 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { } String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), - Util.getProcessObject(updatedProcess).getFrequency()); + Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), + new ProcessMerlin(updatedProcess).getFrequency()); TimeUtil.sleepSeconds(60); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -224,7 +225,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // one is updated correctly. int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(TimeUtil .dateToOozieDate( @@ -268,19 +269,15 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { Util.shutDownService(cluster3.getProcessHelper()); //now to update - AssertUtil.assertPartial( - prism.getProcessHelper() - .update(bundles[1].getProcessData(), bundles[1].getProcessData())); + AssertUtil.assertPartial(prism.getProcessHelper() + .update(bundles[1].getProcessData(), bundles[1].getProcessData())); String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), - initialConcurrency); - Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), - workflowPath); - Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), - bundles[1].getProcessObject().getOrder()); + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency); + Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath); + Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder()); String coloString = getResponse(cluster2, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(coloString).getWorkflow().getPath(), + Assert.assertEquals(new ProcessMerlin(coloString).getWorkflow().getPath(), workflowPath2); Util.startService(cluster3.getProcessHelper()); @@ -300,11 +297,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(20); } prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency + 3); - Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), + Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), workflowPath2); - Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), + Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); AssertUtil @@ -312,7 +309,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { waitingForBundleFinish(cluster3, oldBundleId); int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( @@ -363,8 +360,8 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), - Util.getProcessObject(updatedProcess).getFrequency()); + Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), + new ProcessMerlin(updatedProcess).getFrequency()); dualComparison(prism, cluster3, bundles[1].getProcessData()); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } @@ -433,7 +430,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { } String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), bundles[1].getProcessObject().getParallel() + 3); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -552,7 +549,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // correctly. int finalNumberOfInstances = InstanceUtil .getProcessInstanceList(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) + bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(TimeUtil @@ -601,7 +598,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { } String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), bundles[1].getProcessObject().getParallel() + 3); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -639,7 +636,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( @@ -690,7 +687,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { String prismString = getResponse(prism, bundles[1].getProcessData(), true); dualComparison(prism, cluster3, bundles[1].getProcessData()); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), bundles[1].getProcessObject().getParallel()); //ensure that the running process has new coordinators created; while the submitted @@ -743,7 +740,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( @@ -797,11 +794,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { TimeUtil.sleepSeconds(10); } String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency + 3); - Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), + Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), aggregator1Path); - Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), + Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -812,7 +809,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( bundles[1].getProcessObject().getClusters().getClusters().get(0) @@ -871,11 +868,11 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded(cluster3.getProcessHelper().resume(bundles[1].getProcessData())); String prismString = getResponse(prism, bundles[1].getProcessData(), true); - Assert.assertEquals(Util.getProcessObject(prismString).getParallel(), + Assert.assertEquals(new ProcessMerlin(prismString).getParallel(), initialConcurrency + 3); - Assert.assertEquals(Util.getProcessObject(prismString).getWorkflow().getPath(), + Assert.assertEquals(new ProcessMerlin(prismString).getWorkflow().getPath(), aggregator1Path); - Assert.assertEquals(Util.getProcessObject(prismString).getOrder(), + Assert.assertEquals(new ProcessMerlin(prismString).getOrder(), bundles[1].getProcessObject().getOrder()); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -886,7 +883,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { AssertUtil.checkNotStatus(cluster3OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); int finalNumberOfInstances = InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + bundles[1].getProcessName(), EntityType.PROCESS).size(); int expectedInstances = getExpectedNumberOfWorkflowInstances(TimeUtil.dateToOozieDate( @@ -924,13 +921,13 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { EntityType.PROCESS); String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; - String inputFeed = bundles[1].getInputFeedFromBundle(); + FeedMerlin inputFeed = new FeedMerlin(bundles[1].getInputFeedFromBundle()); bundles[1].addProcessInput(newFeedName, "inputData2"); - inputFeed = Util.setFeedName(inputFeed, newFeedName); + inputFeed.setName(newFeedName); LOGGER.info(inputFeed); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed.toString())); while (Util.parseResponse( prism.getProcessHelper() @@ -975,14 +972,14 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { EntityType.PROCESS); String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; - String inputFeed = bundles[1].getInputFeedFromBundle(); + FeedMerlin inputFeed = new FeedMerlin(bundles[1].getInputFeedFromBundle()); bundles[1].addProcessInput(newFeedName, "inputData2"); - inputFeed = Util.setFeedName(inputFeed, newFeedName); + inputFeed.setName(newFeedName); AssertUtil.assertSucceeded( cluster3.getProcessHelper().suspend(bundles[1].getProcessData())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed.toString())); while (Util.parseResponse( prism.getProcessHelper() @@ -1018,9 +1015,9 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { bundles[1].submitBundle(prism); String originalProcess = bundles[1].getProcessData(); String newFeedName = bundles[1].getInputFeedNameFromBundle() + "2"; - String inputFeed = bundles[1].getInputFeedFromBundle(); + FeedMerlin inputFeed = new FeedMerlin(bundles[1].getInputFeedFromBundle()); bundles[1].addProcessInput(newFeedName, "inputData2"); - inputFeed = Util.setFeedName(inputFeed, newFeedName); + inputFeed.setName(newFeedName); String updatedProcess = bundles[1].getProcessData(); //now to schedule in 1 colo and let it remain in another @@ -1038,7 +1035,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { EntityType.PROCESS); //submit new feed - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(inputFeed.toString())); Util.shutDownService(cluster3.getProcessHelper()); @@ -1126,7 +1123,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // one is updated correctly. int finalNumberOfInstances = InstanceUtil .getProcessInstanceList(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) + bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(bundles[1] @@ -1189,7 +1186,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { // one is updated correctly. int finalNumberOfInstances = InstanceUtil .getProcessInstanceList(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS) + bundles[1].getProcessName(), EntityType.PROCESS) .size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(bundles[1] @@ -1244,7 +1241,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster3OC, bundles[1].getProcessData(), 1, 10); String prismString = dualComparison(prism, cluster2, bundles[1].getProcessData()); - Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), + Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), new Frequency("" + 5, TimeUnit.minutes)); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -1297,7 +1294,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { prism.getProcessHelper().update(updatedProcess, updatedProcess); AssertUtil.assertSucceeded(response); String prismString = dualComparison(prism, cluster3, bundles[1].getProcessData()); - Assert.assertEquals(Util.getProcessObject(prismString).getFrequency(), + Assert.assertEquals(new ProcessMerlin(prismString).getFrequency(), new Frequency("" + 1, TimeUnit.months)); dualComparison(prism, cluster3, bundles[1].getProcessData()); //ensure that the running process has new coordinators created; while the submitted @@ -1388,8 +1385,8 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { //ensure that the running process has new coordinators created; while the submitted // one is updated correctly. int finalNumberOfInstances = - InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, - Util.getProcessName(bundles[1].getProcessData()), EntityType.PROCESS).size(); + InstanceUtil.getProcessInstanceListFromAllBundles(cluster3, bundles[1].getProcessName(), + EntityType.PROCESS).size(); Assert.assertEquals(finalNumberOfInstances, getExpectedNumberOfWorkflowInstances(oldStartTime, bundles[1].getProcessObject().getClusters().getClusters().get(0) http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java index 95de483..c9e373e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/OptionalInputTest.java @@ -93,15 +93,15 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - String process = bundles[0].getProcessData(); - LOGGER.info(Util.prettyPrintXml(process)); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, false); List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide("2010-01-02T00:50Z", "2010-01-02T01:10Z", 5); HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, inputPath + "/input1/", dataDates); - InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process), + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); } @@ -198,8 +198,8 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - String process = bundles[0].getProcessData(); - LOGGER.info(Util.prettyPrintXml(process)); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + LOGGER.info(Util.prettyPrintXml(process.toString())); List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide( TimeUtil.addMinsToTime(startTime, -10), endTime, 5); @@ -210,7 +210,7 @@ public class OptionalInputTest extends BaseTestClass { HadoopUtil.recreateDir(clusterFS, inputPath + "/input0/" + date); } bundles[0].submitFeedsScheduleProcess(prism); - InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process), + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); } @@ -232,11 +232,11 @@ public class OptionalInputTest extends BaseTestClass { LOGGER.info(Util.prettyPrintXml(bundles[0].getDataSets().get(i))); } - String process = bundles[0].getProcessData(); - LOGGER.info(Util.prettyPrintXml(process)); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, false); - InstanceUtil.waitTillInstanceReachState(oozieClient, Util.getProcessName(process), + InstanceUtil.waitTillInstanceReachState(oozieClient, process.getName(), 2, CoordinatorAction.Status.KILLED, EntityType.PROCESS); } @@ -262,9 +262,10 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(2); - String process = bundles[0].getProcessData(); - String processName = Util.getProcessName(process); - LOGGER.info(Util.prettyPrintXml(process)); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + LOGGER.info(Util.prettyPrintXml(process.toString())); + String processName = process.getName(); + LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, true); InstanceUtil.waitTillInstanceReachState(oozieClient, processName, @@ -281,13 +282,13 @@ public class OptionalInputTest extends BaseTestClass { processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 0, 1); bundles[0].setProcessData(processMerlin.toString()); bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); - process = bundles[0].getProcessData(); - LOGGER.info("modified process:" + Util.prettyPrintXml(process)); + process = new ProcessMerlin(bundles[0].getProcessData()); + LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString())); - prism.getProcessHelper().update(process, process); + prism.getProcessHelper().update(process.toString(), process.toString()); //from now on ... it should wait of input0 also - InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster, process.toString(), 0); InstanceUtil.waitTillInstanceReachState(oozieClient, processName, 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS, 10); HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE, @@ -318,9 +319,9 @@ public class OptionalInputTest extends BaseTestClass { bundles[0].setProcessInputStartEnd("now(0,-10)", "now(0,0)"); bundles[0].setProcessConcurrency(4); - String process = bundles[0].getProcessData(); - String processName = Util.getProcessName(process); - LOGGER.info(Util.prettyPrintXml(process)); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); + String processName = process.getName(); + LOGGER.info(Util.prettyPrintXml(process.toString())); bundles[0].submitAndScheduleBundle(prism, true); InstanceUtil.waitTillInstanceReachState(oozieClient, processName, @@ -336,14 +337,14 @@ public class OptionalInputTest extends BaseTestClass { final ProcessMerlin processMerlin = new ProcessMerlin(process); processMerlin.setProcessFeeds(bundles[0].getDataSets(), 2, 2, 1); bundles[0].setProcessData(processMerlin.toString()); - process = bundles[0].getProcessData(); + process = new ProcessMerlin(bundles[0].getProcessData()); //delete all input data HadoopUtil.deleteDirIfExists(inputPath + "/", clusterFS); bundles[0].setProcessInputNames("inputData0", "inputData"); - LOGGER.info("modified process:" + Util.prettyPrintXml(process)); + LOGGER.info("modified process:" + Util.prettyPrintXml(process.toString())); - prism.getProcessHelper().update(process, process); + prism.getProcessHelper().update(process.toString(), process.toString()); //from now on ... it should wait of input0 also InstanceUtil.waitTillInstanceReachState(oozieClient, processName, http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java index 9b8e9d3..e1a96f3 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java @@ -195,120 +195,118 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); //get 2 unique feeds - String feed01 = bundles[0].getInputFeedFromBundle(); - String feed02 = bundles[1].getInputFeedFromBundle(); - String outputFeed = bundles[0].getOutputFeedFromBundle(); + FeedMerlin feed01 = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + FeedMerlin feed02 = new FeedMerlin(bundles[1].getInputFeedFromBundle()); + FeedMerlin outputFeed = new FeedMerlin(bundles[0].getOutputFeedFromBundle()); //set clusters to null; - feed01 = FeedMerlin.fromString(feed01).clearFeedClusters().toString(); - feed02 = FeedMerlin.fromString(feed02).clearFeedClusters().toString(); - outputFeed = FeedMerlin.fromString(outputFeed).clearFeedClusters().toString(); + feed01.clearFeedClusters(); + feed02.clearFeedClusters(); + outputFeed.clearFeedClusters(); //set new feed input data - feed01 = Util.setFeedPathValue(feed01, baseTestDir + "/feed01" + MINUTE_DATE_PATTERN); - feed02 = Util.setFeedPathValue(feed02, baseTestDir + "/feed02" + MINUTE_DATE_PATTERN); + feed01.setFeedPathValue(baseTestDir + "/feed01" + MINUTE_DATE_PATTERN); + feed02.setFeedPathValue(baseTestDir + "/feed02" + MINUTE_DATE_PATTERN); //generate data in both the colos ua1 and ua3 - String prefix = InstanceUtil.getFeedPrefix(feed01); + String prefix = InstanceUtil.getFeedPrefix(feed01.toString()); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS); HadoopUtil.lateDataReplenish(cluster1FS, 25, 1, prefix, null); - prefix = InstanceUtil.getFeedPrefix(feed02); + prefix = InstanceUtil.getFeedPrefix(feed02.toString()); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS); HadoopUtil.lateDataReplenish(cluster3FS, 25, 1, prefix, null); String startTime = TimeUtil.getTimeWrtSystemTime(-50); //set clusters for feed01 - feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + feed01.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); - feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + feed01.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); //set clusters for feed02 - feed02 = FeedMerlin.fromString(feed02).addFeedCluster( + feed02.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); - feed02 = FeedMerlin.fromString(feed02).addFeedCluster( + feed02.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); //set clusters for output feed - outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + outputFeed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); - outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + outputFeed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); //submit and schedule feeds - prism.getFeedHelper().submitAndSchedule(feed01); - prism.getFeedHelper().submitAndSchedule(feed02); - prism.getFeedHelper().submitAndSchedule(outputFeed); + prism.getFeedHelper().submitAndSchedule(feed01.toString()); + prism.getFeedHelper().submitAndSchedule(feed02.toString()); + prism.getFeedHelper().submitAndSchedule(outputFeed.toString()); //create a process with 2 clusters //get a process - String process = bundles[0].getProcessData(); + ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData()); //add clusters to process String processStartTime = TimeUtil.getTimeWrtSystemTime(-6); String processEndTime = TimeUtil.getTimeWrtSystemTime(70); - process = ProcessMerlin.fromString(process).clearProcessCluster().toString(); + process.clearProcessCluster(); - process = ProcessMerlin.fromString(process).addProcessCluster( + process.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder( Util.readEntityName(bundles[0].getClusters().get(0))) .withValidity(processStartTime, processEndTime) - .build() - ).toString(); + .build()); - process = ProcessMerlin.fromString(process).addProcessCluster( + process.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder( Util.readEntityName(bundles[2].getClusters().get(0))) .withValidity(processStartTime, processEndTime) - .build() - ).toString(); - process = InstanceUtil.addProcessInputFeed(process, Util.readEntityName(feed02), - Util.readEntityName(feed02)); + .build()); + process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), + feed02.toString(), feed02.getName())); //submit and schedule process - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process)); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString())); LOGGER.info("Wait till process goes into running "); int timeout = OSUtil.IS_WINDOWS ? 50 : 25; - InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(process), 1, + InstanceUtil.waitTillInstanceReachState(serverOC.get(0), process.getName(), 1, Status.RUNNING, EntityType.PROCESS, timeout); - InstanceUtil.waitTillInstanceReachState(serverOC.get(2), Util.getProcessName(process), 1, + InstanceUtil.waitTillInstanceReachState(serverOC.get(2), process.getName(), 1, Status.RUNNING, EntityType.PROCESS, timeout); - feed01 = InstanceUtil.setFeedFilePath(feed01, alternativeInputPath); - LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01)); - AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01, feed01)); + feed01 = new FeedMerlin(InstanceUtil.setFeedFilePath(feed01.toString(), alternativeInputPath)); + LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01.toString(), feed01.toString())); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java index 7910fd7..483c281 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java @@ -113,105 +113,100 @@ public class PrismFeedUpdateTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster2Def)); //get 2 unique feeds - String feed01 = bundles[0].getInputFeedFromBundle(); - String outputFeed = bundles[0].getOutputFeedFromBundle(); + FeedMerlin feed01 = new FeedMerlin(bundles[0].getInputFeedFromBundle()); + FeedMerlin outputFeed = new FeedMerlin(bundles[0].getOutputFeedFromBundle()); /* set source and target for the 2 feeds */ //set clusters to null; - feed01 = FeedMerlin.fromString(feed01).clearFeedClusters().toString(); - outputFeed = FeedMerlin.fromString(outputFeed).clearFeedClusters().toString(); + feed01.clearFeedClusters(); + outputFeed.clearFeedClusters(); //set new feed input data - feed01 = Util.setFeedPathValue(feed01, baseTestDir + "/feed01" + MINUTE_DATE_PATTERN); + feed01.setFeedPathValue(baseTestDir + "/feed01" + MINUTE_DATE_PATTERN); //generate data in both the colos cluster1colo and cluster2colo - String prefix = InstanceUtil.getFeedPrefix(feed01); + String prefix = InstanceUtil.getFeedPrefix(feed01.toString()); String startTime = TimeUtil.getTimeWrtSystemTime(-40); System.out.println("Start time = " + startTime); HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS); HadoopUtil.lateDataReplenish(server1FS, 80, 20, prefix, null); //set clusters for feed01 - feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + feed01.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster1Def)) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); - feed01 = FeedMerlin.fromString(feed01).addFeedCluster( + .build()); + feed01.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster2Def)) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); //set clusters for output feed - outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + outputFeed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster1Def)) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); - outputFeed = FeedMerlin.fromString(outputFeed).addFeedCluster( + .build()); + outputFeed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(cluster2Def)) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, "2099-01-01T00:00Z") .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); //submit and schedule feeds - LOGGER.info("feed01: " + Util.prettyPrintXml(feed01)); - LOGGER.info("outputFeed: " + Util.prettyPrintXml(outputFeed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed)); + LOGGER.info("feed01: " + Util.prettyPrintXml(feed01.toString())); + LOGGER.info("outputFeed: " + Util.prettyPrintXml(outputFeed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed.toString())); /* create 2 process with 2 clusters */ //get first process - String process01 = bundles[0].getProcessData(); + ProcessMerlin process01 = new ProcessMerlin(bundles[0].getProcessData()); //add clusters to process String processStartTime = TimeUtil.getTimeWrtSystemTime(-11); String processEndTime = TimeUtil.getTimeWrtSystemTime(70); - process01 = ProcessMerlin.fromString(process01).clearProcessCluster().toString(); - process01 = ProcessMerlin.fromString(process01).addProcessCluster( + process01.clearProcessCluster(); + process01.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(cluster1Def)) .withValidity(processStartTime, processEndTime) - .build() - ).toString(); - process01 = ProcessMerlin.fromString(process01).addProcessCluster( + .build()); + process01.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder(Util.readEntityName(cluster2Def)) .withValidity(processStartTime, processEndTime) - .build() - ).toString(); + .build()); //get 2nd process - String process02 = process01; - process02 = InstanceUtil - .setProcessName(process02, this.getClass().getSimpleName() - + "_zeroInputProcess" + new Random().nextInt()); + ProcessMerlin process02 = new ProcessMerlin(InstanceUtil + .setProcessName(process01.toString(), this.getClass().getSimpleName() + + "-zeroInputProcess" + new Random().nextInt())); List<String> feed = new ArrayList<String>(); - feed.add(outputFeed); - final ProcessMerlin processMerlin = new ProcessMerlin(process02); - processMerlin.setProcessFeeds(feed, 0, 0, 1); - process02 = processMerlin.toString(); + feed.add(outputFeed.toString()); + process02.setProcessFeeds(feed, 0, 0, 1); //submit and schedule both process - LOGGER.info("process: " + Util.prettyPrintXml(process01)); - LOGGER.info("process: " + Util.prettyPrintXml(process02)); - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process01)); - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process02)); + LOGGER.info("process: " + Util.prettyPrintXml(process01.toString())); + LOGGER.info("process: " + Util.prettyPrintXml(process02.toString())); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process01.toString())); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process02.toString())); LOGGER.info("Wait till process goes into running "); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process01), 1, + InstanceUtil.waitTillInstanceReachState(cluster1OC, process01.getName(), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process02), 1, + InstanceUtil.waitTillInstanceReachState(cluster1OC, process02.getName(), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1); //change feed location path - outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue"); - LOGGER.info("updated feed: " + Util.prettyPrintXml(outputFeed)); + outputFeed.setFeedProperty("queueName", "myQueue"); + LOGGER.info("updated feed: " + Util.prettyPrintXml(outputFeed.toString())); //update feed first time - AssertUtil.assertSucceeded(prism.getFeedHelper().update(outputFeed, outputFeed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().update(outputFeed.toString(), outputFeed.toString())); } /** http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java index 508cc08..7bc4b5b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.regression.prism; +import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.response.ServiceResponse; @@ -160,7 +161,7 @@ public class PrismSubmitTest extends BaseTestClass { AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - Util.getProcessName(bundles[0].getProcessData()), 1); + new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); Util.startService(cluster1.getClusterHelper()); @@ -179,7 +180,7 @@ public class PrismSubmitTest extends BaseTestClass { AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - Util.getProcessName(bundles[0].getProcessData()), -1); + new ProcessMerlin(bundles[0].getProcessData()).getName(), -1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } @@ -219,9 +220,9 @@ public class PrismSubmitTest extends BaseTestClass { afterSubmitPrism = prism.getProcessHelper().getStoreInfo(); AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, - Util.getProcessName(bundles[0].getProcessData()), 1); + new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - Util.getProcessName(bundles[0].getProcessData()), 1); + new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } @@ -566,7 +567,7 @@ public class PrismSubmitTest extends BaseTestClass { AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - Util.getProcessName(bundles[0].getProcessData()), 1); + new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java index a716b83..272ac3b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.regression.prism; +import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Property; @@ -112,7 +113,7 @@ public class ProcessPartitionExpVariableTest extends BaseTestClass { + "/input1/", dataDates); InstanceUtil.waitTillInstanceReachState(clusterOC, - Util.getProcessName(bundles[0].getProcessData()), 2, + new ProcessMerlin(bundles[0].getProcessData()).getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); } http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java index a363120..1d65d12 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java @@ -76,23 +76,19 @@ public class RescheduleKilledProcessTest extends BaseTestClass { public void rescheduleKilledProcess() throws Exception { String processStartTime = TimeUtil.getTimeWrtSystemTime(-11); String processEndTime = TimeUtil.getTimeWrtSystemTime(6); - String process = bundles[0].getProcessData(); - process = InstanceUtil.setProcessName(process, this.getClass().getSimpleName() - + "zeroInputProcess" + new Random().nextInt()); + ProcessMerlin process = new ProcessMerlin(InstanceUtil.setProcessName(bundles[0].getProcessData(), + this.getClass().getSimpleName() + "zeroInputProcess" + new Random().nextInt())); List<String> feed = new ArrayList<String>(); feed.add(bundles[0].getOutputFeedFromBundle()); - final ProcessMerlin processMerlin = new ProcessMerlin(process); - processMerlin.setProcessFeeds(feed, 0, 0, 1); - process = processMerlin.toString(); + process.setProcessFeeds(feed, 0, 0, 1); - process = ProcessMerlin.fromString(process).clearProcessCluster().toString(); - process = ProcessMerlin.fromString(process).addProcessCluster( + process.clearProcessCluster(); + process.addProcessCluster( new ProcessMerlin.ProcessClusterBuilder( Util.readEntityName(bundles[0].getClusters().get(0))) .withValidity(processStartTime, processEndTime) - .build() - ).toString(); - bundles[0].setProcessData(process); + .build()); + bundles[0].setProcessData(process.toString()); bundles[0].submitFeedsScheduleProcess(prism); http://git-wip-us.apache.org/repos/asf/falcon/blob/5b5113d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java index e578809..0cc0d6e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java @@ -327,7 +327,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { Util.shutDownService(cluster2.getProcessHelper()); //add some property to feed so that new bundle is created - String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal"); + FeedMerlin updatedFeed = new FeedMerlin(feed).setFeedProperty("someProp", "someVal"); //save old data String oldBundleCluster1 = InstanceUtil.getLatestBundleID(cluster1, @@ -337,7 +337,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { //send update command with +5 mins in future String updateTime = TimeUtil.getTimeWrtSystemTime(5); - r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); + r = prism.getFeedHelper().update(feed, updatedFeed.toString(), updateTime, null); AssertUtil.assertPartial(r); //verify new bundle creation on cluster1 and new definition on cluster3 @@ -353,7 +353,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { Util.readEntityName(feed), EntityType.FEED); //send update again - r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); + r = prism.getFeedHelper().update(feed, updatedFeed.toString(), updateTime, null); AssertUtil.assertSucceeded(r); //verify new bundle creation on cluster2 and no new bundle on cluster1 @@ -446,11 +446,11 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(feed), EntityType.FEED); String updateTime = TimeUtil.addMinsToTime(endTime, 60); - String updatedFeed = Util.setFeedProperty(feed, "someProp", "someVal"); + FeedMerlin updatedFeed = new FeedMerlin(feed).setFeedProperty("someProp", "someVal"); LOGGER.info("Original Feed : " + Util.prettyPrintXml(feed)); - LOGGER.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed)); + LOGGER.info("Updated Feed :" + Util.prettyPrintXml(updatedFeed.toString())); LOGGER.info("Update Time : " + updateTime); - r = prism.getFeedHelper().update(feed, updatedFeed, updateTime, null); + r = prism.getFeedHelper().update(feed, updatedFeed.toString(), updateTime, null); AssertUtil.assertSucceeded(r); InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1);
