Repository: falcon Updated Branches: refs/heads/master b0a40c5ab -> ccdf02e7e
FALCON-1058 Test for Feed Replication with Empty Directories. Contributed by Pragya M Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccdf02e7 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccdf02e7 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccdf02e7 Branch: refs/heads/master Commit: ccdf02e7e9dd95522a1cc1ad583d556ebf92e0d9 Parents: b0a40c5 Author: samarthg <[email protected]> Authored: Wed Mar 4 07:00:01 2015 +0000 Committer: samarthg <[email protected]> Committed: Wed Mar 4 07:00:01 2015 +0000 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 3 + .../falcon/regression/FeedReplicationTest.java | 177 +++++++++++-------- 2 files changed, 103 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/ccdf02e7/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 8df604a..1813952 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -55,10 +55,13 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + FALCON-1058 Test for Feed Replication with Empty Directories(Pragya M via Samarth Gupta) FALCON-1046 Add test for process update with user feature(Karishma G via Samarth Gupta) + FALCON-1017 FeedReplicationTest modified to check for _SUCCESS getting created on target directory(Pragya M via Samarth G) + FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for running jobs in Falcon. (Pragya M via Samarth G) http://git-wip-us.apache.org/repos/asf/falcon/blob/ccdf02e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java index 3930c8c..f3a8318 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java @@ -47,6 +47,7 @@ import org.joda.time.format.DateTimeFormatter; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import javax.xml.bind.JAXBException; @@ -56,6 +57,7 @@ import java.util.List; /** * feed replication test. + * Replicates empty directories as well as directories containing data. */ @Test(groups = "embedded") public class FeedReplicationTest extends BaseTestClass { @@ -89,8 +91,9 @@ public class FeedReplicationTest extends BaseTestClass { } @AfterMethod(alwaysRun = true) - public void tearDown() { + public void tearDown() throws IOException { removeTestClassEntities(); + cleanTestsDirs(); } /** @@ -99,10 +102,10 @@ public class FeedReplicationTest extends BaseTestClass { * replication ends test checks if data was replicated correctly. * Also checks for presence of _SUCCESS file in target directory. */ - @Test - public void replicate1Source1Target() + @Test(dataProvider = "dataFlagProvider") + public void replicate1Source1Target(boolean dataFlag) throws AuthenticationException, IOException, URISyntaxException, JAXBException, - OozieClientException, InterruptedException { + OozieClientException, InterruptedException { Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 5); @@ -115,19 +118,19 @@ public class FeedReplicationTest extends BaseTestClass { feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); //set cluster1 as source feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set cluster2 as target feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); //submit and schedule feed LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); @@ -143,26 +146,28 @@ public class FeedReplicationTest extends BaseTestClass { Path toSource = new Path(sourceLocation); Path toTarget = new Path(targetLocation); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.RESOURCES + "feed-s4Replication.xml"); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + if (dataFlag) { + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, + OSUtil.RESOURCES + "feed-s4Replication.xml"); + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + } //check if coordinator exists InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + "REPLICATION"), 1); //replication should start, wait while it ends InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); + .getAllFilesRecursivelyHDFS(cluster1FS, toSource); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); + .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); @@ -179,8 +184,8 @@ public class FeedReplicationTest extends BaseTestClass { * targets. When replication ends test checks if data was replicated correctly. * Also checks for presence of _SUCCESS file in target directory. */ - @Test - public void replicate1Source2Targets() throws Exception { + @Test(dataProvider = "dataFlagProvider") + public void replicate1Source2Targets(boolean dataFlag) throws Exception { Bundle.submitCluster(bundles[0], bundles[1], bundles[2]); String startTime = TimeUtil.getTimeWrtSystemTime(0); String endTime = TimeUtil.addMinsToTime(startTime, 5); @@ -193,27 +198,27 @@ public class FeedReplicationTest extends BaseTestClass { feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); //set cluster1 as source feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set cluster2 as target feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); //set cluster3 as target feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); //submit and schedule feed LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); @@ -229,9 +234,12 @@ public class FeedReplicationTest extends BaseTestClass { Path toSource = new Path(sourceLocation); Path toTarget = new Path(targetLocation); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.RESOURCES + "feed-s4Replication.xml"); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + + if (dataFlag) { + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, + OSUtil.RESOURCES + "feed-s4Replication.xml"); + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + } //check if all coordinators exist InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); @@ -239,26 +247,26 @@ public class FeedReplicationTest extends BaseTestClass { InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), + "REPLICATION"), 1); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), + "REPLICATION"), 1); //replication on cluster 2 should start, wait till it ends InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //replication on cluster 3 should start, wait till it ends InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); + .getAllFilesRecursivelyHDFS(cluster1FS, toSource); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); + .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); List<Path> cluster3ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster3FS, toTarget); + .getAllFilesRecursivelyHDFS(cluster3FS, toTarget); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster3ReplicatedData); @@ -279,8 +287,8 @@ public class FeedReplicationTest extends BaseTestClass { * replication starts and when it ends test checks if data was replicated correctly. * Also checks for presence of availability flag in target directory. */ - @Test - public void availabilityFlagTest() throws Exception { + @Test(dataProvider = "dataFlagProvider") + public void availabilityFlagTest(boolean dataFlag) throws Exception { //replicate1Source1Target scenario + set availability flag but don't upload required file Bundle.submitCluster(bundles[0], bundles[1]); String startTime = TimeUtil.getTimeWrtSystemTime(0); @@ -299,19 +307,19 @@ public class FeedReplicationTest extends BaseTestClass { feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); //set cluster1 as source feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()).toString(); //set cluster2 as target feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()).toString(); //submit and schedule feed LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); @@ -327,42 +335,44 @@ public class FeedReplicationTest extends BaseTestClass { Path toSource = new Path(sourceLocation); Path toTarget = new Path(targetLocation); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.RESOURCES + "feed-s4Replication.xml"); - HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + if (dataFlag) { + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, + OSUtil.RESOURCES + "feed-s4Replication.xml"); + HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, OSUtil.RESOURCES + "log_01.txt"); + } //check while instance is got created InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); //check if coordinator exists Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, "REPLICATION"), 1); //replication should not start even after time TimeUtil.sleepSeconds(60); InstancesResult r = prism.getFeedHelper().getProcessInstanceStatus(feedName, - "?start=" + startTime + "&end=" + endTime); + "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); LOGGER.info("Replication didn't start."); //create availability flag on source HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, - OSUtil.RESOURCES + availabilityFlagName); + OSUtil.RESOURCES + availabilityFlagName); //check if instance become running InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.RUNNING, EntityType.FEED); + CoordinatorAction.Status.RUNNING, EntityType.FEED); //wait till instance succeed InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data was replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster1FS, toSource); + .getAllFilesRecursivelyHDFS(cluster1FS, toSource); LOGGER.info("Data on source cluster: " + cluster1ReplicatedData); List<Path> cluster2ReplicatedData = HadoopUtil - .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); + .getAllFilesRecursivelyHDFS(cluster2FS, toTarget); LOGGER.info("Data on target cluster: " + cluster2ReplicatedData); AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData); @@ -372,4 +382,17 @@ public class FeedReplicationTest extends BaseTestClass { //availabilityFlag should exist in target Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, availabilityFlagName), true); } + + + /* Flag value denotes whether to add data for replication or not. + * flag=true : add data for replication. + * flag=false : let empty directories be replicated. + */ + @DataProvider(name = "dataFlagProvider") + private Object[][] dataFlagProvider() { + return new Object[][] { + new Object[] {true, }, + new Object[] {false, }, + }; + } }
