http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java index 1cc558e..2f83483 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedLateReplicationTest.java @@ -18,6 +18,7 @@ package org.apache.falcon.regression.prism; +import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.feed.ActionType; @@ -29,7 +30,6 @@ import org.apache.falcon.regression.core.util.InstanceUtil; import org.apache.falcon.regression.core.util.OSUtil; import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; -import org.apache.falcon.regression.core.util.XmlUtil; import org.apache.falcon.regression.testHelper.BaseTestClass; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; @@ -45,19 +45,22 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.List; +/** + * Late replication test with prism server. + */ @Test(groups = "distributed") public class PrismFeedLateReplicationTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - ColoHelper cluster3 = servers.get(2); - FileSystem cluster1FS = serverFS.get(0); - FileSystem cluster2FS = serverFS.get(1); - FileSystem cluster3FS = serverFS.get(2); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private ColoHelper cluster3 = servers.get(2); + private FileSystem cluster1FS = serverFS.get(0); + private FileSystem cluster2FS = serverFS.get(1); + private FileSystem cluster3FS = serverFS.get(2); private String baseTestDir = baseHDFSDir + "/PrismFeedLateReplicationTest"; private String inputPath = baseTestDir + "/input-data" + MINUTE_DATE_PATTERN; private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedLateReplicationTest.class); + private static final Logger LOGGER = Logger.getLogger(PrismFeedLateReplicationTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -66,7 +69,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readELBundle(); for (int i = 0; i < 3; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); @@ -81,16 +84,13 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { } @Test(groups = {"multiCluster"}) - public void multipleSourceOneTarget_pastData() throws Exception { + public void multipleSourceOneTargetPastData() throws Exception { bundles[0].setInputFeedDataPath(inputPath); Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), null, - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); String postFix = "/US/" + cluster2.getClusterHelper().getColoName(); String prefix = bundles[0].getFeedDataPathPrefix(); @@ -105,26 +105,31 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { String startTime = TimeUtil.getTimeWrtSystemTime(-30); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, - "US/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("US/${cluster.colo}") + .build()).toString(); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, - null); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, - "UK/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("UK/${cluster.colo}") + .build()).toString(); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); @@ -138,10 +143,9 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } @@ -164,40 +168,41 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { } @Test(groups = {"multiCluster"}) - public void multipleSourceOneTarget_futureData() throws Exception { + public void multipleSourceOneTargetFutureData() throws Exception { bundles[0].setInputFeedDataPath(inputPath); Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), null, - ClusterType.SOURCE, null); - + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); String startTime = TimeUtil.getTimeWrtSystemTime(3); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, - "US/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("US/${cluster.colo}") + .build()).toString(); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, - null); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, - "UK/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("UK/${cluster.colo}") + .build()).toString(); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); @@ -223,32 +228,29 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } - logger.info("still in for loop"); + LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(0), 0), + replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED); Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0), + replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED); TimeUtil.sleepSeconds(15); List<String> inputFolderListForColo1 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), - 1); + .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), - 1); + .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1); HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT, inputFolderListForColo1); @@ -269,21 +271,20 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //wait for lates run to complete for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } - logger.info("still in for loop"); + LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(0), 0), + replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED); Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0), + replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED); TimeUtil.sleepSeconds(30); @@ -307,7 +308,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { } /** - * this test case does the following + * this test case does the following. * two source ua2 and ua3 * ua3 has following part data * ua1/ua2 @@ -325,74 +326,61 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { * <p/> * after first late succeed data is put into other source and late should not */ - @Test(groups = {"multiCluster"}) public void mixedTest01() throws Exception { - bundles[0].setInputFeedDataPath(inputPath); Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); - - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), null, - ClusterType.SOURCE, null); - - - String startTime = TimeUtil.getTimeWrtSystemTime(3); - - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, - "ua1/${cluster.colo}"); - - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, - null); - - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, - "ua1/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + final String startTime = TimeUtil.getTimeWrtSystemTime(3); + + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("ua1/${cluster.colo}") + .build()).toString(); + + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); + + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("ua1/${cluster.colo}") + .build()).toString(); //create data in colos - - String postFix = "/ua1/ua2"; String prefix = bundles[0].getFeedDataPathPrefix(); HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster2FS); + String postFix = "/ua1/ua2"; HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix); - postFix = "/ua2/ua2"; HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix); - postFix = "/ua3/ua2"; HadoopUtil.lateDataReplenishWithoutSuccess(cluster2FS, 90, 1, prefix, postFix); //put _SUCCESS in parent folder UA2 HadoopUtil.putFileInFolderHDFS(cluster2FS, 90, 1, prefix, "_SUCCESS"); - - postFix = "/ua1/ua3"; HadoopUtil.deleteDirIfExists(prefix.substring(1), cluster3FS); + postFix = "/ua1/ua3"; HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix); - postFix = "/ua2/ua3"; HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix); - postFix = "/ua3/ua3"; HadoopUtil.lateDataReplenish(cluster3FS, 90, 1, prefix, postFix); //put _SUCCESS in parent folder of UA3 HadoopUtil.putFileInFolderHDFS(cluster3FS, 90, 1, prefix, "_SUCCESS"); - //submit and schedule feed - logger.info("feed: " + Util.prettyPrintXml(feed)); - - prism.getFeedHelper().submitAndSchedule(feed); + prism.getFeedHelper().submitAndSchedule(feed); //submit and schedule feed TimeUtil.sleepSeconds(10); //wait till 1st instance of replication coord is SUCCEEDED @@ -404,22 +392,21 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } - logger.info("still in for loop"); + LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, + replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, "Replication job should have succeeded."); Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, + replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, "Replication job should have succeeded."); TimeUtil.sleepSeconds(15); @@ -427,29 +414,23 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for exact folders to be created in ua1 : ua1/ua2 and ua1/ua3 no other should // be present. both of them should have _success - List<String> inputFolderListForColo1 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), - 1); + .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 1); List<String> inputFolderListForColo2 = InstanceUtil - .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), - 1); + .getInputFoldersForInstanceForReplication(cluster1, replicationCoordIDTarget.get(1), 1); String outPutLocation = InstanceUtil - .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), - 0); + .getOutputFolderForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 0); String outPutBaseLocation = InstanceUtil .getOutputFolderBaseForInstanceForReplication(cluster1, replicationCoordIDTarget.get(0), 0); - List<String> subfolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation); - - Assert.assertEquals(subfolders.size(), 1); - Assert.assertEquals(subfolders.get(0), "ua1"); + List<String> subFolders = HadoopUtil.getHDFSSubFoldersName(cluster1FS, outPutBaseLocation); + Assert.assertEquals(subFolders.size(), 1); + Assert.assertEquals(subFolders.get(0), "ua1"); Assert.assertFalse(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutBaseLocation, "_SUCCESS")); - Assert.assertTrue(HadoopUtil.isFilePresentHDFS(cluster1FS, outPutLocation, "_SUCCESS")); HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.NORMAL_INPUT, @@ -462,30 +443,24 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 1 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) == - 1 + InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + == 1 && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0) == 1, "id have to be equal 1"); - - //wait for lates run to complete + //wait for latest run to complete for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) - == WorkflowJob.Status.SUCCEEDED) { + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } - logger.info("still in for loop"); + LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } - - TimeUtil.sleepSeconds(30); - //put data for the second time HadoopUtil.flattenAndPutDataInFolder(cluster2FS, OSUtil.OOZIE_EXAMPLE_INPUT_DATA + "2ndLateData", inputFolderListForColo1); @@ -494,11 +469,10 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //sleep till late 2 starts TimeUtil.sleepTill(TimeUtil.addMinsToTime(startTime, 9)); - //check for run id to be 2 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) == - 2 + InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + == 2 && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0) == 2, "id have to be equal 2"); @@ -510,7 +484,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { * <p/> * this test case does the following * two source ua2 and ua3 - * ua3 has follwing part data + * ua3 has following part data * ua1/ua2 * ua1/ua2 * ua1/ua2 @@ -535,30 +509,32 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //get feed String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), null, - ClusterType.SOURCE, null); + feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); String startTime = TimeUtil.getTimeWrtSystemTime(3); - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE, - "ua1/${cluster.colo}"); - - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET, - null); - - feed = InstanceUtil.setFeedCluster(feed, - XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"), - XmlUtil.createRetention("hours(10)", ActionType.DELETE), - Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, - "ua1/${cluster.colo}"); + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("ua1/${cluster.colo}") + .build()).toString(); + + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.TARGET) + .build()).toString(); + + feed = FeedMerlin.fromString(feed).addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("hours(10)", ActionType.DELETE) + .withValidity(startTime, "2099-01-01T00:00Z") + .withClusterType(ClusterType.SOURCE) + .withPartition("ua1/${cluster.colo}") + .build()).toString(); //create data in colos @@ -592,7 +568,7 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { TimeUtil.sleepSeconds(15); //submit and schedule feed - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); @@ -606,23 +582,21 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { for (int i = 0; i < 30; i++) { if (InstanceUtil.getInstanceStatusFromCoord(cluster1, replicationCoordIDTarget.get(0), - 0) - == WorkflowJob.Status.SUCCEEDED + 0) == WorkflowJob.Status.SUCCEEDED && InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0) - == WorkflowJob.Status.SUCCEEDED) { + replicationCoordIDTarget.get(1), 0) == WorkflowJob.Status.SUCCEEDED) { break; } - logger.info("still in for loop"); + LOGGER.info("still in for loop"); TimeUtil.sleepSeconds(20); } Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, + replicationCoordIDTarget.get(0), 0), WorkflowJob.Status.SUCCEEDED, "Replication job did not succeed"); Assert.assertEquals(InstanceUtil.getInstanceStatusFromCoord(cluster1, - replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, + replicationCoordIDTarget.get(1), 0), WorkflowJob.Status.SUCCEEDED, "Replication job did not succeed"); TimeUtil.sleepSeconds(15); @@ -664,8 +638,8 @@ public class PrismFeedLateReplicationTest extends BaseTestClass { //check for run id to be 1 Assert.assertTrue( - InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) == - 1 + InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(0), 0) + == 1 && InstanceUtil.getInstanceRunIdFromCoord(cluster1, replicationCoordIDTarget.get(1), 0) == 1, "id have to be equal 1");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java index ed2d9d7..bde4e65 100755 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationPartitionExpTest.java @@ -50,27 +50,30 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +/** + * Replication feed with partitions as expression language variables tests. + */ @Test(groups = "distributed") public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - ColoHelper cluster3 = servers.get(2); - FileSystem cluster1FS = serverFS.get(0); - FileSystem cluster2FS = serverFS.get(1); - FileSystem cluster3FS = serverFS.get(2); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private ColoHelper cluster3 = servers.get(2); + private FileSystem cluster1FS = serverFS.get(0); + private FileSystem cluster2FS = serverFS.get(1); + private FileSystem cluster3FS = serverFS.get(2); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); private String testDate = "/2012/10/01/12/"; private String baseTestDir = baseHDFSDir + "/PrismFeedReplicationPartitionExpTest"; private String testBaseDir1 = baseTestDir + "/localDC/rc/billing"; private String testBaseDir2 = baseTestDir + "/clusterPath/localDC/rc/billing"; private String testBaseDir3 = baseTestDir + "/dataBillingRC/fetlrc/billing"; private String testBaseDir4 = baseTestDir + "/sourcetarget"; - private String testBaseDir_server1source = baseTestDir + "/source1"; + private String testBaseDirServer1source = baseTestDir + "/source1"; private String testDirWithDate = testBaseDir1 + testDate; - private String testDirWithDate_sourcetarget = testBaseDir4 + testDate; - private String testDirWithDate_source1 = testBaseDir_server1source + testDate; + private String testDirWithDateSourcetarget = testBaseDir4 + testDate; + private String testDirWithDateSource1 = testBaseDirServer1source + testDate; private String testFile1 = OSUtil.RESOURCES + OSUtil.getPath("ReplicationResources", "feed-s4Replication.xml"); private String testFile2 = OSUtil.RESOURCES + OSUtil.getPath("ReplicationResources", "id.pig"); @@ -78,7 +81,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { + OSUtil.getPath("ReplicationResources", "cluster-0.1.xml"); private String testFile4 = OSUtil.RESOURCES + OSUtil.getPath("ReplicationResources", "log4testng.properties"); - private static final Logger logger = + private static final Logger LOGGER = Logger.getLogger(PrismFeedReplicationPartitionExpTest.class); @@ -99,7 +102,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @BeforeClass(alwaysRun = true) public void createTestData() throws Exception { - logger.info("creating test data"); + LOGGER.info("creating test data"); uploadDataToServer3(testDirWithDate + "00/ua2/", testFile1); uploadDataToServer3(testDirWithDate + "05/ua2/", testFile2); @@ -142,37 +145,37 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { //data for test normalTest_1s2t_pst where both source target partition are required - uploadDataToServer3(testDirWithDate_sourcetarget + "00/ua3/ua2/", testFile1); - uploadDataToServer3(testDirWithDate_sourcetarget + "05/ua3/ua2/", testFile2); - uploadDataToServer3(testDirWithDate_sourcetarget + "10/ua3/ua2/", testFile3); - uploadDataToServer3(testDirWithDate_sourcetarget + "15/ua3/ua2/", testFile4); - uploadDataToServer3(testDirWithDate_sourcetarget + "20/ua3/ua2/", testFile4); + uploadDataToServer3(testDirWithDateSourcetarget + "00/ua3/ua2/", testFile1); + uploadDataToServer3(testDirWithDateSourcetarget + "05/ua3/ua2/", testFile2); + uploadDataToServer3(testDirWithDateSourcetarget + "10/ua3/ua2/", testFile3); + uploadDataToServer3(testDirWithDateSourcetarget + "15/ua3/ua2/", testFile4); + uploadDataToServer3(testDirWithDateSourcetarget + "20/ua3/ua2/", testFile4); - uploadDataToServer3(testDirWithDate_sourcetarget + "00/ua3/ua1/", testFile1); - uploadDataToServer3(testDirWithDate_sourcetarget + "05/ua3/ua1/", testFile2); - uploadDataToServer3(testDirWithDate_sourcetarget + "10/ua3/ua1/", testFile3); - uploadDataToServer3(testDirWithDate_sourcetarget + "15/ua3/ua1/", testFile4); - uploadDataToServer3(testDirWithDate_sourcetarget + "20/ua3/ua1/", testFile4); + uploadDataToServer3(testDirWithDateSourcetarget + "00/ua3/ua1/", testFile1); + uploadDataToServer3(testDirWithDateSourcetarget + "05/ua3/ua1/", testFile2); + uploadDataToServer3(testDirWithDateSourcetarget + "10/ua3/ua1/", testFile3); + uploadDataToServer3(testDirWithDateSourcetarget + "15/ua3/ua1/", testFile4); + uploadDataToServer3(testDirWithDateSourcetarget + "20/ua3/ua1/", testFile4); // data when server 1 acts as source - uploadDataToServer1(testDirWithDate_source1 + "00/ua2/", testFile1); - uploadDataToServer1(testDirWithDate_source1 + "05/ua2/", testFile2); + uploadDataToServer1(testDirWithDateSource1 + "00/ua2/", testFile1); + uploadDataToServer1(testDirWithDateSource1 + "05/ua2/", testFile2); - uploadDataToServer1(testDirWithDate_source1 + "00/ua1/", testFile1); - uploadDataToServer1(testDirWithDate_source1 + "05/ua1/", testFile2); + uploadDataToServer1(testDirWithDateSource1 + "00/ua1/", testFile1); + uploadDataToServer1(testDirWithDateSource1 + "05/ua1/", testFile2); - uploadDataToServer1(testDirWithDate_source1 + "00/ua3/", testFile1); - uploadDataToServer1(testDirWithDate_source1 + "05/ua3/", testFile2); + uploadDataToServer1(testDirWithDateSource1 + "00/ua3/", testFile1); + uploadDataToServer1(testDirWithDateSource1 + "05/ua3/", testFile2); - logger.info("completed creating test data"); + LOGGER.info("completed creating test data"); } @BeforeMethod(alwaysRun = true) public void testName(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readFeedReplicaltionBundle(); for (int i = 0; i < 3; i++) { @@ -227,17 +230,17 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { XmlUtil.createRetention("days(1000000)", ActionType.DELETE), Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, ""); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); - AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source " + - "is blank"); + AssertUtil.assertFailed(r, "submit of feed should have failed as the partition in source " + + "is blank"); } @Test(enabled = true) - public void normalTest_1s1t1n_ps() throws Exception { + public void normalTest1s1t1nPS() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -275,7 +278,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "${cluster.colo}", testBaseDir1 + MINUTE_DATE_PATTERN); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); @@ -336,7 +339,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @Test(enabled = true) - public void normalTest_1s1t1n_pt() throws Exception { + public void normalTest1s1t1nPT() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -370,7 +373,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null, testBaseDir1 + MINUTE_DATE_PATTERN); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitAndSchedule(feed); TimeUtil.sleepSeconds(10); @@ -420,7 +423,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @Test(enabled = true) - public void normalTest_1s2t_pt() throws Exception { + public void normalTest1s2tPT() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -462,7 +465,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); @@ -518,7 +521,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { } @Test(enabled = true, groups = "embedded") - public void normalTest_2s1t_pt() throws Exception { + public void normalTest2s1tPT() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -559,20 +562,20 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, null); //clean target if old data exists - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); AssertUtil.assertFailed(r, "Submission of feed should have failed."); Assert.assertTrue(r.getMessage().contains( - "Partition expression has to be specified for cluster " + - Util.readEntityName(bundles[0].getClusters().get(0)) + - " as there are more than one source clusters"), + "Partition expression has to be specified for cluster " + + Util.readEntityName(bundles[0].getClusters().get(0)) + + " as there are more than one source clusters"), "Failed response has unexpected error message."); } @Test(enabled = true) - public void normalTest_1s2t_ps() throws Exception { + public void normalTest1s2tPS() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes @@ -615,7 +618,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "${cluster.colo}"); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); @@ -675,7 +678,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @Test(enabled = true) - public void normalTest_2s1t_ps() throws Exception { + public void normalTest2s1tPS() throws Exception { //this test is for ideal condition when data is present in all the required places and // replication takes // place normally @@ -702,7 +705,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { XmlUtil.createRetention("days(1000000)", ActionType.DELETE), Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, "${cluster.colo}", - testBaseDir_server1source + MINUTE_DATE_PATTERN); + testBaseDirServer1source + MINUTE_DATE_PATTERN); feed = InstanceUtil .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"), @@ -716,7 +719,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "${cluster.colo}", testBaseDir1 + MINUTE_DATE_PATTERN); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); @@ -727,7 +730,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { TimeUtil.sleepSeconds(15); InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 2, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED,20); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED, 20); //check if data has been replicated correctly @@ -746,7 +749,8 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { List<Path> ua1OriginalData00 = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, new Path(testBaseDir_server1source + testDate + "00/ua1")); + .getAllFilesRecursivelyHDFS(cluster1FS, new Path( + testBaseDirServer1source + testDate + "00/ua1")); List<Path> ua3OriginalData05 = HadoopUtil .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate + "05/ua1")); @@ -756,7 +760,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { @Test(enabled = true) - public void normalTest_1s2t_pst() throws Exception { + public void normalTest1s2tPST() throws Exception { //this test is for ideal condition when data is present in all the required places and @@ -798,7 +802,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "${cluster.colo}", testBaseDir4 + MINUTE_DATE_PATTERN + "/"); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); @@ -841,13 +845,17 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { List<Path> ua3OriginalData05ua1 = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "05/ua3/ua1")); + .getAllFilesRecursivelyHDFS(cluster3FS, new Path( + testDirWithDateSourcetarget + "05/ua3/ua1")); List<Path> ua3OriginalData10ua1 = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "10/ua3/ua1")); + .getAllFilesRecursivelyHDFS(cluster3FS, new Path( + testDirWithDateSourcetarget + "10/ua3/ua1")); List<Path> ua3OriginalData10ua2 = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "10/ua3/ua2")); + .getAllFilesRecursivelyHDFS(cluster3FS, new Path( + testDirWithDateSourcetarget + "10/ua3/ua2")); List<Path> ua3OriginalData15ua2 = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, new Path(testDirWithDate_sourcetarget + "15/ua3/ua2")); + .getAllFilesRecursivelyHDFS(cluster3FS, new Path( + testDirWithDateSourcetarget + "15/ua3/ua2")); AssertUtil.checkForListSizes(ua1ReplicatedData05, ua3OriginalData05ua1); AssertUtil.checkForListSizes(ua1ReplicatedData10, ua3OriginalData10ua1); @@ -886,7 +894,7 @@ public class PrismFeedReplicationPartitionExpTest extends BaseTestClass { XmlUtil.createRetention("days(1000000)", ActionType.DELETE), Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, ""); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); ServiceResponse r = prism.getFeedHelper().submitEntity(feed); TimeUtil.sleepSeconds(10); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java index 63011b8..76345e0 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java @@ -45,24 +45,27 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Update replication feed tests. + */ @Test(groups = "embedded") public class PrismFeedReplicationUpdateTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - ColoHelper cluster3 = servers.get(2); - FileSystem cluster1FS = serverFS.get(0); - FileSystem cluster2FS = serverFS.get(1); - FileSystem cluster3FS = serverFS.get(2); - String cluster1Colo = cluster1.getClusterHelper().getColoName(); - String cluster2Colo = cluster2.getClusterHelper().getColoName(); - String cluster3Colo = cluster3.getClusterHelper().getColoName(); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private ColoHelper cluster3 = servers.get(2); + private FileSystem cluster1FS = serverFS.get(0); + private FileSystem cluster2FS = serverFS.get(1); + private FileSystem cluster3FS = serverFS.get(2); + private String cluster1Colo = cluster1.getClusterHelper().getColoName(); + private String cluster2Colo = cluster2.getClusterHelper().getColoName(); + private String cluster3Colo = cluster3.getClusterHelper().getColoName(); private final String baseTestDir = baseHDFSDir + "/PrismFeedReplicationUpdateTest"; private final String inputPath = baseTestDir + "/input-data" + MINUTE_DATE_PATTERN; - private String alternativeInputPath = baseTestDir + "/newFeedPath/input-data" + - MINUTE_DATE_PATTERN; + private String alternativeInputPath = baseTestDir + "/newFeedPath/input-data" + + MINUTE_DATE_PATTERN; private String aggregateWorkflowDir = baseTestDir + "/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedReplicationUpdateTest.class); + private static final Logger LOGGER = Logger.getLogger(PrismFeedReplicationUpdateTest.class); @BeforeClass(alwaysRun = true) public void prepareCluster() throws IOException { @@ -72,7 +75,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readELBundle(); for (int i = 0; i < 3; i++) { @@ -139,7 +142,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE, "UK/${cluster.colo}"); - logger.info("feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed)); AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(feed)); @@ -147,7 +150,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { //change feed location path feed = InstanceUtil.setFeedFilePath(feed, alternativeInputPath); - logger.info("updated feed: " + Util.prettyPrintXml(feed)); + LOGGER.info("updated feed: " + Util.prettyPrintXml(feed)); //update feed AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed, feed)); @@ -180,7 +183,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { * @throws Exception */ @Test(enabled = true, timeOut = 1800000) - public void updateFeed_dependentProcessTest() throws Exception { + public void updateFeedDependentProcessTest() throws Exception { //set cluster colos bundles[0].setCLusterColo(cluster1Colo); bundles[1].setCLusterColo(cluster2Colo); @@ -292,7 +295,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { //submit and schedule process AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process)); - logger.info("Wait till process goes into running "); + LOGGER.info("Wait till process goes into running "); int timeout = OSUtil.IS_WINDOWS ? 50 : 25; InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(process), 1, @@ -301,7 +304,7 @@ public class PrismFeedReplicationUpdateTest extends BaseTestClass { Status.RUNNING, EntityType.PROCESS, timeout); feed01 = InstanceUtil.setFeedFilePath(feed01, alternativeInputPath); - logger.info("updated feed: " + Util.prettyPrintXml(feed01)); + LOGGER.info("updated feed: " + Util.prettyPrintXml(feed01)); AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed01, feed01)); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java index 79ceacc..79b12f1 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedResumeTest.java @@ -39,16 +39,19 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Resume feed via prism tests. + */ @Test(groups = "distributed") public class PrismFeedResumeTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); private boolean restartRequired; - String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedResumeTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedResumeTest.class); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedResumeTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismFeedResumeTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -57,7 +60,7 @@ public class PrismFeedResumeTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void testName(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java index 9a32932..69e8918 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java @@ -37,13 +37,16 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Schedule feed via prism tests. + */ @Test(groups = "embedded") public class PrismFeedScheduleTest extends BaseTestClass { - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedScheduleTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedScheduleTest.class); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedScheduleTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismFeedScheduleTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -52,7 +55,7 @@ public class PrismFeedScheduleTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws IOException { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { @@ -76,8 +79,8 @@ public class PrismFeedScheduleTest extends BaseTestClass { @Test(groups = {"prism", "0.2"}) public void testFeedScheduleOn1ColoWhileAnotherColoHasSuspendedFeed() throws Exception { - logger.info("cluster: " + Util.prettyPrintXml(bundles[0].getClusters().get(0))); - logger.info("feed: " + Util.prettyPrintXml(bundles[0].getDataSets().get(0))); + LOGGER.info("cluster: " + Util.prettyPrintXml(bundles[0].getClusters().get(0))); + LOGGER.info("feed: " + Util.prettyPrintXml(bundles[0].getDataSets().get(0))); bundles[0].submitAndScheduleFeed(); AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(bundles[0].getDataSets().get(0))); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java index b8f3095..1cf44b7 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java @@ -45,16 +45,19 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Submit and schedule feed via prism tests. + */ public class PrismFeedSnSTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); private boolean restartRequired; - String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedSnSTest.class); - String feed1, feed2; + private String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismFeedSnSTest.class); + private String feed1, feed2; @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -63,7 +66,7 @@ public class PrismFeedSnSTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); restartRequired = false; Bundle bundle = BundleUtil.readELBundle(); for (int i = 0; i < 2; i++) { @@ -262,7 +265,6 @@ public class PrismFeedSnSTest extends BaseTestClass { /** * Submit and schedule feed1 on cluster1 and check that it failed. Repeat for feed2. - * TODO: should be reviewed */ @Test(groups = {"prism", "0.2", "distributed"}) public void testFeedSnSOnBothColosUsingColoHelper() throws Exception { @@ -369,12 +371,12 @@ public class PrismFeedSnSTest extends BaseTestClass { String clust2 = bundles[1].getClusters().get(0); bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName()); - logger.info("cluster bundles[0]: " + Util.prettyPrintXml(clust1)); + LOGGER.info("cluster bundles[0]: " + Util.prettyPrintXml(clust1)); ServiceResponse r = prism.getClusterHelper().submitEntity(clust1); Assert.assertTrue(r.getMessage().contains("SUCCEEDED")); bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName()); - logger.info("cluster bundles[1]: " + Util.prettyPrintXml(clust2)); + LOGGER.info("cluster bundles[1]: " + Util.prettyPrintXml(clust2)); r = prism.getClusterHelper().submitEntity(clust2); Assert.assertTrue(r.getMessage().contains("SUCCEEDED")); @@ -395,9 +397,9 @@ public class PrismFeedSnSTest extends BaseTestClass { feed = InstanceUtil .setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"), XmlUtil.createRetention("days(10000)", ActionType.DELETE), - Util.readEntityName(clust2), ClusterType.TARGET, null, baseHDFSDir + - "/clusterPath/localDC/rc/billing" + MINUTE_DATE_PATTERN); - logger.info("feed: " + Util.prettyPrintXml(feed)); + Util.readEntityName(clust2), ClusterType.TARGET, null, baseHDFSDir + + "/clusterPath/localDC/rc/billing" + MINUTE_DATE_PATTERN); + LOGGER.info("feed: " + Util.prettyPrintXml(feed)); Util.shutDownService(cluster1.getFeedHelper()); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java index 7de5e39..9c3b1c0 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSuspendTest.java @@ -39,14 +39,17 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Suspend feed via prism tests. + */ public class PrismFeedSuspendTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSuspendTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismFeedSuspendTest.class); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSuspendTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismFeedSuspendTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -55,7 +58,7 @@ public class PrismFeedSuspendTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readELBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java index 9b5d770..902ec23 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java @@ -54,20 +54,23 @@ import java.util.List; import java.util.Random; +/** + * Update feed via prism tests. + */ @Test(groups = "embedded") public class PrismFeedUpdateTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - FileSystem server1FS = serverFS.get(0); - OozieClient cluster1OC = serverOC.get(0); - String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest"; - String aggregateWorkflowDir = baseTestDir + "/aggregator"; - String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop"; - public final String cluster1colo = cluster1.getClusterHelper().getColoName(); - public final String cluster2colo = cluster2.getClusterHelper().getColoName(); - private static final Logger logger = Logger.getLogger(PrismFeedUpdateTest.class); - String feedInputTimedOutPath = baseTestDir + "/timedout" + MINUTE_DATE_PATTERN; + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private FileSystem server1FS = serverFS.get(0); + private OozieClient cluster1OC = serverOC.get(0); + private String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest"; + private String aggregateWorkflowDir = baseTestDir + "/aggregator"; + private String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop"; + private final String cluster1colo = cluster1.getClusterHelper().getColoName(); + private final String cluster2colo = cluster2.getClusterHelper().getColoName(); + private static final Logger LOGGER = Logger.getLogger(PrismFeedUpdateTest.class); + private String feedInputTimedOutPath = baseTestDir + "/timedout" + MINUTE_DATE_PATTERN; @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -77,7 +80,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readELBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); @@ -97,7 +100,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { * queue. TODO : complete test case */ @Test(enabled = true, timeOut = 1200000) - public void updateFeedQueue_dependentMultipleProcess_oneProcessZeroInput() throws Exception { + public void updateFeedQueueDependentMultipleProcessOneProcessZeroInput() throws Exception { //cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo, // feed02 on cluster2colo target cluster1colo bundles[0].setProcessWorkflow(workflowForNoIpOp); @@ -106,9 +109,9 @@ public class PrismFeedUpdateTest extends BaseTestClass { //set cluster colos bundles[0].setCLusterColo(cluster1colo); - logger.info("cluster bundles[0]: " + Util.prettyPrintXml(cluster1Def)); + LOGGER.info("cluster bundles[0]: " + Util.prettyPrintXml(cluster1Def)); bundles[1].setCLusterColo(cluster2colo); - logger.info("cluster bundles[1]: " + Util.prettyPrintXml(cluster2Def)); + LOGGER.info("cluster bundles[1]: " + Util.prettyPrintXml(cluster2Def)); //submit 2 clusters AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(cluster1Def)); @@ -162,8 +165,8 @@ public class PrismFeedUpdateTest extends BaseTestClass { Util.readEntityName(cluster2Def), ClusterType.TARGET, null); //submit and schedule feeds - logger.info("feed01: " + Util.prettyPrintXml(feed01)); - logger.info("outputFeed: " + Util.prettyPrintXml(outputFeed)); + LOGGER.info("feed01: " + Util.prettyPrintXml(feed01)); + LOGGER.info("outputFeed: " + Util.prettyPrintXml(outputFeed)); AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed01)); AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(outputFeed)); @@ -195,11 +198,11 @@ public class PrismFeedUpdateTest extends BaseTestClass { process02 = processMerlin.toString(); //submit and schedule both process - logger.info("process: " + Util.prettyPrintXml(process01)); - logger.info("process: " + Util.prettyPrintXml(process02)); + LOGGER.info("process: " + Util.prettyPrintXml(process01)); + LOGGER.info("process: " + Util.prettyPrintXml(process02)); AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process01)); AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process02)); - logger.info("Wait till process goes into running "); + LOGGER.info("Wait till process goes into running "); InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process01), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1); InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process02), 1, @@ -207,7 +210,7 @@ public class PrismFeedUpdateTest extends BaseTestClass { //change feed location path outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue"); - logger.info("updated feed: " + Util.prettyPrintXml(outputFeed)); + LOGGER.info("updated feed: " + Util.prettyPrintXml(outputFeed)); //update feed first time AssertUtil.assertSucceeded(prism.getFeedHelper().update(outputFeed, outputFeed)); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java index 05b5276..99f3acc 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessDeleteTest.java @@ -43,14 +43,17 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +/** + * Delete process via prism tests. + */ @Test(groups = "distributed") public class PrismProcessDeleteTest extends BaseTestClass { - Bundle bundle; - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessDeleteTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismProcessDeleteTest.class); + private Bundle bundle; + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessDeleteTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismProcessDeleteTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -59,7 +62,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); @@ -189,7 +192,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { compareDataStoreStates(systemPostUp.get("ua1Archive"), finalUA1ArchiveStore, clusterName); } catch (Exception e) { - logger.info(e.getMessage()); + LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); } finally { Util.restartService(cluster2.getClusterHelper()); @@ -242,7 +245,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { compareDataStoresForEquality(initialUA1Store, finalUA1Store); compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore); } catch (Exception e) { - logger.info(e.getMessage()); + LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); } } @@ -305,7 +308,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { compareDataStoreStates(finalUA1ArchiveStore, initialUA1ArchiveStore, clusterName); } catch (Exception e) { - logger.info(e.getMessage()); + LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); } finally { Util.restartService(cluster2.getClusterHelper()); @@ -356,7 +359,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { compareDataStoresForEquality(initialUA1Store, finalUA1Store); compareDataStoresForEquality(initialUA1ArchiveStore, finalUA1ArchiveStore); } catch (Exception e) { - logger.info(e.getMessage()); + LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); } } @@ -415,7 +418,7 @@ public class PrismProcessDeleteTest extends BaseTestClass { prism.getProcessHelper().delete(bundles[0].getProcessData())); } catch (Exception e) { - logger.info(e.getMessage()); + LOGGER.info(e.getMessage()); throw new TestNGException(e.getMessage()); } finally { Util.restartService(cluster2.getClusterHelper()); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java index c2b9681..c3d8c00 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessResumeTest.java @@ -39,15 +39,18 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Resume process via prism tests. + */ @Test(groups = "distributed") public class PrismProcessResumeTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessResumeTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismProcessResumeTest.class); + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessResumeTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismProcessResumeTest.class); @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -56,7 +59,7 @@ public class PrismProcessResumeTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java index ce6f675..fdbbfdb 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java @@ -46,17 +46,20 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; +/** + * Schedule process via prsm tests. + */ public class PrismProcessScheduleTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator"; - String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop"; - private static final Logger logger = Logger.getLogger(PrismProcessScheduleTest.class); - String process1; - String process2; + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator"; + private String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop"; + private static final Logger LOGGER = Logger.getLogger(PrismProcessScheduleTest.class); + private String process1; + private String process2; @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -66,7 +69,7 @@ public class PrismProcessScheduleTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); @@ -328,15 +331,15 @@ public class PrismProcessScheduleTest extends BaseTestClass { String process = bundles[0].getProcessData(); try { hadoopFileEditor = new HadoopFileEditor(cluster1.getClusterHelper().getHadoopFS()); - hadoopFileEditor.edit(new ProcessMerlin(process).getWorkflow().getPath() + - "/workflow.xml", "<value>${outputData}</value>", - "<property>\n" + - " <name>randomProp</name>\n" + - " <value>randomValue</value>\n" + - " </property>"); + hadoopFileEditor.edit(new ProcessMerlin(process).getWorkflow().getPath() + + "/workflow.xml", "<value>${outputData}</value>", + "<property>\n" + + " <name>randomProp</name>\n" + + " <value>randomValue</value>\n" + + " </property>"); bundles[0].submitFeedsScheduleProcess(prism); InstanceUtil.waitForBundleToReachState(cluster1, - Util.readEntityName(process),Job.Status.KILLED); + Util.readEntityName(process), Job.Status.KILLED); String oldBundleID = InstanceUtil.getLatestBundleID(cluster1, Util.readEntityName(process), EntityType.PROCESS); prism.getProcessHelper().delete(process); @@ -349,49 +352,49 @@ public class PrismProcessScheduleTest extends BaseTestClass { hadoopFileEditor.restore(); } } - } + } - /** - * Schedule a process that contains no inputs. The process should be successfully scheduled. - * - * @throws Exception - */ - @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000) - public void testScheduleWhenZeroInputs()throws Exception{ - bundles[0].submitClusters(prism); - bundles[0].setProcessWorkflow(workflowForNoIpOp); - ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); - processObj.setInputs(null); - processObj.setLateProcess(null); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle())); - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule( - processObj.toString())); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(processObj.toString()), 2, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10); - } - - /** - * Schedule a process that contains no inputs or outputs. The process should be successfully scheduled. - * - * @throws Exception - */ - @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000) - public void testScheduleWhenZeroInputsZeroOutputs()throws Exception{ - bundles[0].submitClusters(prism); - bundles[0].setProcessWorkflow(workflowForNoIpOp); - ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); - processObj.setInputs(null); - processObj.setOutputs(null); - processObj.setLateProcess(null); - AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule( - processObj.toString())); - InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(processObj.toString()), 2, - CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10); - } - - - @AfterClass(alwaysRun = true) - public void tearDownClass() throws IOException { - cleanTestDirs(); - } + /** + * Schedule a process that contains no inputs. The process should be successfully scheduled. + * + * @throws Exception + */ + @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000) + public void testScheduleWhenZeroInputs()throws Exception{ + bundles[0].submitClusters(prism); + bundles[0].setProcessWorkflow(workflowForNoIpOp); + ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + processObj.setInputs(null); + processObj.setLateProcess(null); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(bundles[0].getOutputFeedFromBundle())); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule( + processObj.toString())); + InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(processObj.toString()), 2, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10); + } + + /** + * Schedule a process that contains no inputs or outputs. The process should be successfully scheduled. + * + * @throws Exception + */ + @Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000) + public void testScheduleWhenZeroInputsZeroOutputs()throws Exception{ + bundles[0].submitClusters(prism); + bundles[0].setProcessWorkflow(workflowForNoIpOp); + ProcessMerlin processObj = new ProcessMerlin(bundles[0].getProcessData()); + processObj.setInputs(null); + processObj.setOutputs(null); + processObj.setLateProcess(null); + AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule( + processObj.toString())); + InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(processObj.toString()), 2, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10); + } + + + @AfterClass(alwaysRun = true) + public void tearDownClass() throws IOException { + cleanTestDirs(); + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/387604d1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java index a83b8f4..6dd6341 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java @@ -41,16 +41,19 @@ import org.testng.annotations.Test; import java.io.IOException; import java.lang.reflect.Method; +/** + * Submit and schedule process via prism tests. + */ public class PrismProcessSnSTest extends BaseTestClass { - ColoHelper cluster1 = servers.get(0); - ColoHelper cluster2 = servers.get(1); - OozieClient cluster1OC = serverOC.get(0); - OozieClient cluster2OC = serverOC.get(1); - String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator"; - private static final Logger logger = Logger.getLogger(PrismProcessSnSTest.class); - String process1; - String process2; + private ColoHelper cluster1 = servers.get(0); + private ColoHelper cluster2 = servers.get(1); + private OozieClient cluster1OC = serverOC.get(0); + private OozieClient cluster2OC = serverOC.get(1); + private String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator"; + private static final Logger LOGGER = Logger.getLogger(PrismProcessSnSTest.class); + private String process1; + private String process2; @BeforeClass(alwaysRun = true) public void uploadWorkflow() throws Exception { @@ -59,7 +62,7 @@ public class PrismProcessSnSTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp(Method method) throws Exception { - logger.info("test name: " + method.getName()); + LOGGER.info("test name: " + method.getName()); Bundle bundle = BundleUtil.readLateDataBundle(); for (int i = 0; i < 2; i++) { bundles[i] = new Bundle(bundle, servers.get(i)); @@ -91,7 +94,7 @@ public class PrismProcessSnSTest extends BaseTestClass { //check if there is no criss cross ServiceResponse response = prism.getProcessHelper().getStatus(process2); - logger.info(response.getMessage()); + LOGGER.info(response.getMessage()); AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING); } @@ -219,13 +222,13 @@ public class PrismProcessSnSTest extends BaseTestClass { AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(process2)); Assert.assertEquals(Util.parseResponse(prism.getProcessHelper() - .getStatus(process1)).getMessage(),cluster1Running); + .getStatus(process1)).getMessage(), cluster1Running); Assert.assertEquals(Util.parseResponse(prism.getProcessHelper() .getStatus(process2)).getMessage(), cluster2Running); } /** - * Attempt to submit and schedule processes when all required entities weren't registered + * Attempt to submit and schedule processes when all required entities weren't registered. */ @Test(groups = {"prism", "0.2", "distributed"}) public void testScheduleNonExistentProcessOnBothColos() throws Exception {