Repository: falcon Updated Branches: refs/heads/master 8c7eaa69f -> 395675fb0
http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java index 97d4e67..c6f72cc 100755 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java @@ -201,38 +201,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2012-10-01T12:10Z") .withClusterType(ClusterType.SOURCE) .withPartition("") .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2012-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("") .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("") - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source " + "is blank"); @@ -255,42 +255,39 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:00Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(100000)", ActionType.DELETE) .withValidity(startTimeUA1, "2099-10-01T12:10Z") - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(100000)", ActionType.DELETE) .withValidity(startTimeUA2, "2099-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(100000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - r = prism.getFeedHelper().schedule(feed); + r = prism.getFeedHelper().schedule(feed.toString()); AssertUtil.assertSucceeded(r); TimeUtil.sleepSeconds(15); @@ -302,19 +299,19 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { HadoopUtil.copyDataToFolder(cluster3FS, testDirWithDate + "05/ua3/", testFile2); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), "REPLICATION"), 1); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), "RETENTION"), 1); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(), "RETENTION"), 1); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(), "RETENTION"), 1); @@ -356,53 +353,52 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:00Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2099-10-01T12:10Z") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2099-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed); + ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), "REPLICATION"), 1); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), "RETENTION"), 1); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), + .checkIfFeedCoordExist(cluster1.getFeedHelper(), feed.getName(), "RETENTION"), 1); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), + .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(), "RETENTION"), 1); @@ -449,48 +445,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, testBaseDir3 + MINUTE_DATE_PATTERN); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(testBaseDir3 + MINUTE_DATE_PATTERN); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2012-10-01T12:10Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2012-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); TimeUtil.sleepSeconds(15); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1, + InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 3, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 3, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); //check if data has been replicated correctly @@ -550,38 +546,37 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2012-10-01T12:10Z") .withClusterType(ClusterType.SOURCE) .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2012-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); //clean target if old data exists - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); AssertUtil.assertFailed(r, "Submission of feed should have failed."); Assert.assertTrue(r.getMessage().contains( "Partition expression has to be specified for cluster " @@ -609,49 +604,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, - testBaseDir1 + MINUTE_DATE_PATTERN); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(10000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2012-10-01T12:11Z") .withClusterType(ClusterType.TARGET) .withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(10000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2012-10-01T12:26Z") .withClusterType(ClusterType.TARGET) .withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(10000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("${cluster.colo}") - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); TimeUtil.sleepSeconds(15); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1, + InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); //check if data has been replicated correctly @@ -716,47 +708,46 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA1 = "2012-10-01T12:00Z"; String startTimeUA2 = "2012-10-01T12:00Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2099-10-01T12:10Z") .withClusterType(ClusterType.SOURCE) .withPartition("${cluster.colo}") .withDataLocation(testBaseDirServer1Source + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2099-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withDataLocation(testBaseDir2 + "/replicated" + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()) - .toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - r = prism.getFeedHelper().schedule(feed); + r = prism.getFeedHelper().schedule(feed.toString()); AssertUtil.assertSucceeded(r); TimeUtil.sleepSeconds(15); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); //check if data has been replicated correctly @@ -804,49 +795,48 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA1 = "2012-10-01T12:05Z"; String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, testBaseDir1 + MINUTE_DATE_PATTERN); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(testBaseDir1 + MINUTE_DATE_PATTERN); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2099-10-01T12:10Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir1 + "/ua1" + MINUTE_DATE_PATTERN + "/") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2099-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir1 + "/ua2" + MINUTE_DATE_PATTERN + "/") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("${cluster.colo}") .withDataLocation(testBaseDir4 + MINUTE_DATE_PATTERN + "/") - .build()) - .toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertSucceeded(r); - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); TimeUtil.sleepSeconds(15); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(feed), 1, + InstanceUtil.waitTillInstanceReachState(cluster1OC, feed.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); //check if data has been replicated correctly @@ -905,38 +895,38 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { String startTimeUA1 = "2012-10-01T12:05Z"; String startTimeUA2 = "2012-10-01T12:10Z"; - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA1, "2012-10-01T12:10Z") .withClusterType(ClusterType.SOURCE) .withPartition("") .withDataLocation(testBaseDir1 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTimeUA2, "2012-10-01T12:25Z") .withClusterType(ClusterType.TARGET) .withPartition("") .withDataLocation(testBaseDir2 + MINUTE_DATE_PATTERN) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity("2012-10-01T12:00Z", "2099-01-01T00:00Z") .withClusterType(ClusterType.SOURCE) .withPartition("") - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - ServiceResponse r = prism.getFeedHelper().submitEntity(feed); + ServiceResponse r = prism.getFeedHelper().submitEntity(feed.toString()); TimeUtil.sleepSeconds(10); AssertUtil.assertFailed(r, "is defined more than once for feed"); Assert.assertTrue(r.getMessage().contains("is defined more than once for feed")); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java index e1a96f3..6f60bb8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java @@ -101,8 +101,8 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { bundles[0].setInputFeedDataPath(inputPath); Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); - String feed = bundles[0].getDataSets().get(0); - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.clearFeedClusters(); // use the colo string here so that the test works in embedded and distributed mode. String postFix = "/US/" + cluster2Colo; @@ -118,62 +118,62 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { String startTime = TimeUtil.getTimeWrtSystemTime(-30); - feed = FeedMerlin.fromString(feed) + feed .addFeedCluster(new FeedMerlin.FeedClusterBuilder( Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(startTime, TimeUtil.addMinsToTime(startTime, 85)) .withClusterType(ClusterType.SOURCE) .withPartition("US/${cluster.colo}") - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(TimeUtil.addMinsToTime(startTime, 20), TimeUtil.addMinsToTime(startTime, 105)) .withClusterType(ClusterType.TARGET) - .build()).toString(); + .build()); - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) .withRetention("hours(10)", ActionType.DELETE) .withValidity(TimeUtil.addMinsToTime(startTime, 40), TimeUtil.addMinsToTime(startTime, 130)) .withClusterType(ClusterType.SOURCE) .withPartition("UK/${cluster.colo}") - .build()).toString(); + .build()); - LOGGER.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed.toString())); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed.toString())); //change feed location path - feed = InstanceUtil.setFeedFilePath(feed, alternativeInputPath); + feed.setFilePath(alternativeInputPath); - LOGGER.info("updated feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("updated feed: " + Util.prettyPrintXml(feed.toString())); //update feed - AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed, feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString())); Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), - Util.readEntityName(feed), + Util.readEntityName(feed.toString()), "REPLICATION"), 0); Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster2.getFeedHelper(), - Util.readEntityName(feed), + Util.readEntityName(feed.toString()), "RETENTION"), 2); Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), - Util.readEntityName(feed), + Util.readEntityName(feed.toString()), "REPLICATION"), 0); Assert.assertEquals(InstanceUtil.checkIfFeedCoordExist(cluster3.getFeedHelper(), - Util.readEntityName(feed), + Util.readEntityName(feed.toString()), "RETENTION"), 2); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()), "REPLICATION"), 4); Assert.assertEquals( - InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed), + InstanceUtil.checkIfFeedCoordExist(cluster1.getFeedHelper(), Util.readEntityName(feed.toString()), "RETENTION"), 2); } @@ -209,11 +209,11 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { feed02.setFeedPathValue(baseTestDir + "/feed02" + MINUTE_DATE_PATTERN); //generate data in both the colos ua1 and ua3 - String prefix = InstanceUtil.getFeedPrefix(feed01.toString()); + String prefix = feed01.getFeedPrefix(); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster1FS); HadoopUtil.lateDataReplenish(cluster1FS, 25, 1, prefix, null); - prefix = InstanceUtil.getFeedPrefix(feed02.toString()); + prefix = feed02.getFeedPrefix(); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS); HadoopUtil.lateDataReplenish(cluster3FS, 25, 1, prefix, null); @@ -290,9 +290,9 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { new ProcessMerlin.ProcessClusterBuilder( Util.readEntityName(bundles[2].getClusters().get(0))) .withValidity(processStartTime, processEndTime) - .build()); - process = new ProcessMerlin(InstanceUtil.addProcessInputFeed(process.toString(), - feed02.toString(), feed02.getName())); + .build() + ); + process.addInputFeed(feed02.getName(), feed02.getName()); //submit and schedule process AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process.toString())); @@ -305,7 +305,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(serverOC.get(2), process.getName(), 1, Status.RUNNING, EntityType.PROCESS, timeout); - feed01 = new FeedMerlin(InstanceUtil.setFeedFilePath(feed01.toString(), alternativeInputPath)); + feed01.setFilePath(alternativeInputPath); LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01.toString())); AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01.toString(), feed01.toString())); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java index 483c281..d855e33 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java @@ -125,7 +125,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { feed01.setFeedPathValue(baseTestDir + "/feed01" + MINUTE_DATE_PATTERN); //generate data in both the colos cluster1colo and cluster2colo - String prefix = InstanceUtil.getFeedPrefix(feed01.toString()); + String prefix = feed01.getFeedPrefix(); String startTime = TimeUtil.getTimeWrtSystemTime(-40); System.out.println("Start time = " + startTime); HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS); @@ -183,9 +183,9 @@ public class PrismFeedUpdateTest extends BaseTestClass { .build()); //get 2nd process - ProcessMerlin process02 = new ProcessMerlin(InstanceUtil - .setProcessName(process01.toString(), this.getClass().getSimpleName() - + "-zeroInputProcess" + new Random().nextInt())); + ProcessMerlin process02 = new ProcessMerlin(process01); + process02.setName(this.getClass().getSimpleName() + "-zeroInputProcess" + + new Random().nextInt()); List<String> feed = new ArrayList<String>(); feed.add(outputFeed.toString()); process02.setProcessFeeds(feed, 0, 0, 1); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java index f1ff8fe..e2f01c5 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java @@ -106,14 +106,14 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: - compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName); - compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); + compareDataStoreStates(initialUA1Store, finalUA1Store, processName); + compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName); //UA2: compareDataStoresForEquality(initialUA2Store, finalUA2Store); @@ -157,7 +157,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: compareDataStoresForEquality(initialPrismStore, finalPrismStore); compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore); @@ -177,16 +177,16 @@ public class PrismProcessDeleteTest extends BaseTestClass { HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS); - compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName); + compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName); compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore, - clusterName); + processName); compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store")); compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive")); - compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName); + compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName); compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore, - clusterName); + processName); } catch (Exception e) { LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); @@ -290,18 +290,18 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA2: compareDataStoresForEquality(initialUA2Store, finalUA2Store); compareDataStoresForEquality(initialUA2ArchiveStore, finalUA2ArchiveStore); //UA1: - compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName); - compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); + compareDataStoreStates(initialUA1Store, finalUA1Store, processName); + compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName); } catch (Exception e) { LOGGER.info(e.getMessage()); @@ -455,14 +455,14 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: - compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName); - compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); + compareDataStoreStates(initialUA1Store, finalUA1Store, processName); + compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName); //UA2: compareDataStoresForEquality(initialUA2Store, finalUA2Store); @@ -506,14 +506,14 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundles[0].getProcessData()); + String processName = bundles[0].getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: - compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName); - compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); + compareDataStoreStates(initialUA1Store, finalUA1Store, processName); + compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName); //UA2: compareDataStoresForEquality(initialUA2Store, finalUA2Store); @@ -559,14 +559,14 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: - compareDataStoreStates(initialUA1Store, finalUA1Store, clusterName); - compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); + compareDataStoreStates(initialUA1Store, finalUA1Store, processName); + compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, processName); //UA2: compareDataStoresForEquality(initialUA2Store, finalUA2Store); @@ -672,7 +672,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundles[0].getProcessData()); + String processName = bundles[0].getProcessName(); //prism: compareDataStoresForEquality(initialPrismStore, finalPrismStore); compareDataStoresForEquality(finalPrismArchiveStore, initialPrismArchiveStore); @@ -694,13 +694,13 @@ public class PrismProcessDeleteTest extends BaseTestClass { compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store")); compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive")); - compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName); + compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName); compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore, - clusterName); + processName); - compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName); + compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName); compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore, - clusterName); + processName); } catch (Exception e) { e.printStackTrace(); @@ -752,18 +752,18 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: compareDataStoresForEquality(initialUA1Store, finalUA1Store); compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore); //UA2: - compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName); - compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName); + compareDataStoreStates(initialUA2Store, finalUA2Store, processName); + compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName); } catch (Exception e) { e.printStackTrace(); throw new TestNGException(e.getMessage()); @@ -817,18 +817,18 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundle.getProcessData()); + String processName = bundle.getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: compareDataStoresForEquality(initialUA1Store, finalUA1Store); compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore); //UA2: - compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName); - compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName); + compareDataStoreStates(initialUA2Store, finalUA2Store, processName); + compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName); } catch (Exception e) { e.printStackTrace(); throw new TestNGException(e.getMessage()); @@ -876,18 +876,18 @@ public class PrismProcessDeleteTest extends BaseTestClass { //now ensure that data has been deleted from all cluster store and is present in the // cluster archives - String clusterName = Util.readEntityName(bundles[1].getProcessData()); + String processName = bundles[1].getProcessName(); //prism: - compareDataStoreStates(initialPrismStore, finalPrismStore, clusterName); - compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, clusterName); + compareDataStoreStates(initialPrismStore, finalPrismStore, processName); + compareDataStoreStates(finalPrismArchiveStore, initialPrismArchiveStore, processName); //UA1: compareDataStoresForEquality(initialUA1Store, finalUA1Store); compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore); //UA2: - compareDataStoreStates(initialUA2Store, finalUA2Store, clusterName); - compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, clusterName); + compareDataStoreStates(initialUA2Store, finalUA2Store, processName); + compareDataStoreStates(finalUA2ArchiveStore, initialUA2ArchiveStore, processName); Util.startService(cluster2.getClusterHelper()); @@ -897,18 +897,18 @@ public class PrismProcessDeleteTest extends BaseTestClass { HashMap<String, List<String>> systemPostUp = getSystemState(EntityType.PROCESS); - clusterName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); compareDataStoresForEquality(finalUA2Store, systemPostUp.get("ua2Store")); compareDataStoresForEquality(finalUA2ArchiveStore, systemPostUp.get("ua2Archive")); - compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), clusterName); + compareDataStoreStates(finalPrismStore, systemPostUp.get("prismStore"), processName); compareDataStoreStates(systemPostUp.get("prismArchive"), finalPrismArchiveStore, - clusterName); + processName); - compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), clusterName); + compareDataStoreStates(finalUA1Store, systemPostUp.get("ua1Store"), processName); compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore, - clusterName); + processName); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java index 03f380d..4555221 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java @@ -335,9 +335,9 @@ public class PrismProcessScheduleTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0); OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, - Util.readEntityName(bundles[0].getProcessData()), 0); + bundles[0].getProcessName(), 0); InstanceUtil.waitTillInstanceReachState(cluster1OC, - Util.readEntityName(bundles[0].getProcessData()), 2, + bundles[0].getProcessName(), 2, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 5); InstanceUtil.waitForBundleToReachState(cluster1, http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java index dfb405f..4aa7189 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java @@ -179,7 +179,7 @@ public class PrismProcessSnSTest extends BaseTestClass { //reschedule trial AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(bundles[0].getProcessData())); Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(), - Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1); + bundles[0].getProcessName(), EntityType.PROCESS).size(), 1); AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java index 7bc4b5b..f90a76b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java @@ -18,7 +18,6 @@ package org.apache.falcon.regression.prism; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.response.ServiceResponse; @@ -160,8 +159,7 @@ public class PrismSubmitTest extends BaseTestClass { List<String> afterSubmitPrism = prism.getProcessHelper().getStoreInfo(); AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); - AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); + AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); Util.startService(cluster1.getClusterHelper()); @@ -179,8 +177,7 @@ public class PrismSubmitTest extends BaseTestClass { afterSubmitPrism = prism.getProcessHelper().getStoreInfo(); AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); - AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - new ProcessMerlin(bundles[0].getProcessData()).getName(), -1); + AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), -1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } @@ -219,10 +216,8 @@ public class PrismSubmitTest extends BaseTestClass { afterSubmitCluster2 = cluster2.getProcessHelper().getStoreInfo(); afterSubmitPrism = prism.getProcessHelper().getStoreInfo(); - AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, - new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); - AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); + AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, bundles[0].getProcessName(), 1); + AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } @@ -566,8 +561,7 @@ public class PrismSubmitTest extends BaseTestClass { afterSubmitPrism = prism.getProcessHelper().getStoreInfo(); AssertUtil.compareDataStoreStates(beforeSubmitCluster1, afterSubmitCluster1, 0); - AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, - new ProcessMerlin(bundles[0].getProcessData()).getName(), 1); + AssertUtil.compareDataStoreStates(beforeSubmitPrism, afterSubmitPrism, bundles[0].getProcessName(), 1); AssertUtil.compareDataStoreStates(beforeSubmitCluster2, afterSubmitCluster2, 0); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java index 272ac3b..f407601 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/ProcessPartitionExpVariableTest.java @@ -18,7 +18,6 @@ package org.apache.falcon.regression.prism; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Property; @@ -112,8 +111,7 @@ public class ProcessPartitionExpVariableTest extends BaseTestClass { HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, baseTestDir + "/input1/", dataDates); - InstanceUtil.waitTillInstanceReachState(clusterOC, - new ProcessMerlin(bundles[0].getProcessData()).getName(), 2, + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java index 1d65d12..2a73538 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleKilledProcessTest.java @@ -23,7 +23,6 @@ import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.AssertUtil; import org.apache.falcon.regression.core.util.BundleUtil; -import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; @@ -76,8 +75,9 @@ public class RescheduleKilledProcessTest extends BaseTestClass { public void rescheduleKilledProcess() throws Exception { String processStartTime = TimeUtil.getTimeWrtSystemTime(-11); String processEndTime = TimeUtil.getTimeWrtSystemTime(6); - ProcessMerlin process = new ProcessMerlin(InstanceUtil.setProcessName(bundles[0].getProcessData(), - this.getClass().getSimpleName() + "zeroInputProcess" + new Random().nextInt())); + ProcessMerlin process = bundles[0].getProcessObject(); + process.setName(this.getClass().getSimpleName() + "-zeroInputProcess" + + new Random().nextInt()); List<String> feed = new ArrayList<String>(); feed.add(bundles[0].getOutputFeedFromBundle()); process.setProcessFeeds(feed, 0, 0, 1); @@ -87,7 +87,8 @@ public class RescheduleKilledProcessTest extends BaseTestClass { new ProcessMerlin.ProcessClusterBuilder( Util.readEntityName(bundles[0].getClusters().get(0))) .withValidity(processStartTime, processEndTime) - .build()); + .build() + ); bundles[0].setProcessData(process.toString()); bundles[0].submitFeedsScheduleProcess(prism); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java index 7e4422b..8d7ac7f 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RescheduleProcessInFinalStatesTest.java @@ -29,7 +29,6 @@ import org.apache.falcon.regression.core.util.HadoopUtil; import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.TimeUtil; -import org.apache.falcon.regression.core.util.Util; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.authentication.client.AuthenticationException; @@ -159,7 +158,7 @@ public class RescheduleProcessInFinalStatesTest extends BaseTestClass { InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); prism.getProcessHelper() - .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()), + .getProcessInstanceKill(bundles[0].getProcessName(), "?start=2010-01-02T01:05Z&end=2010-01-02T01:11Z"); InstanceUtil .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.DONEWITHERROR); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java index 0cc0d6e..14d76af 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java @@ -126,9 +126,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); //update frequency - Frequency f = new Frequency("" + 21, Frequency.TimeUnit.minutes); - String updatedFeed = InstanceUtil.setFeedFrequency(feed, f); - ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed, "abc", null); + FeedMerlin updatedFeed = new FeedMerlin(feed); + updatedFeed.setFrequency(new Frequency("21", Frequency.TimeUnit.minutes)); + ServiceResponse r = prism.getFeedHelper().update(feed, updatedFeed.toString(), "abc", null); Assert.assertTrue(r.getMessage() .contains("java.lang.IllegalArgumentException: abc is not a valid UTC string")); } @@ -182,9 +182,9 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 0); //update frequency - Frequency f = new Frequency("" + 7, Frequency.TimeUnit.minutes); - String updatedFeed = InstanceUtil.setFeedFrequency(feed, f); - r = prism.getFeedHelper().update(feed, updatedFeed, + FeedMerlin updatedFeed = new FeedMerlin(feed); + updatedFeed.setFrequency(new Frequency("7", Frequency.TimeUnit.minutes)); + r = prism.getFeedHelper().update(feed, updatedFeed.toString(), TimeUtil.getTimeWrtSystemTime(-10000), null); AssertUtil.assertSucceeded(r); InstanceUtil.waitTillInstancesAreCreated(cluster1, feed, 1); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java index 4c54409..040057e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java @@ -141,7 +141,7 @@ public class ProcessUITest extends BaseUITestClass { Generator.getHadoopPathGenerator(feedInputPath, MINUTE_DATE_PATTERN)); int j = 0; for (FeedMerlin feed : inputFeeds) { - bundles[0].addInputFeedToBundle("inputFeed" + j, feed.toString(), j++); + bundles[0].addInputFeedToBundle("inputFeed" + j++, feed); } outputFeeds = LineageApiTest.generateFeeds(numOutputFeeds, outputMerlin, @@ -150,7 +150,7 @@ public class ProcessUITest extends BaseUITestClass { Generator.getHadoopPathGenerator(feedOutputPath, MINUTE_DATE_PATTERN)); j = 0; for (FeedMerlin feed : outputFeeds) { - bundles[0].addOutputFeedToBundle("outputFeed" + j, feed.toString(), j++); + bundles[0].addOutputFeedToBundle("outputFeed" + j++, feed); } AssertUtil.assertSucceeded(bundles[0].submitBundle(prism));
