Repository: falcon
Updated Branches:
  refs/heads/master b0a40c5ab -> ccdf02e7e


FALCON-1058 Test for Feed Replication with Empty Directories. Contributed by 
Pragya M


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccdf02e7
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccdf02e7
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccdf02e7

Branch: refs/heads/master
Commit: ccdf02e7e9dd95522a1cc1ad583d556ebf92e0d9
Parents: b0a40c5
Author: samarthg <[email protected]>
Authored: Wed Mar 4 07:00:01 2015 +0000
Committer: samarthg <[email protected]>
Committed: Wed Mar 4 07:00:01 2015 +0000

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   3 +
 .../falcon/regression/FeedReplicationTest.java  | 177 +++++++++++--------
 2 files changed, 103 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/ccdf02e7/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 8df604a..1813952 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -55,10 +55,13 @@ Trunk (Unreleased)
    via Samarth Gupta)
 
   IMPROVEMENTS
+   FALCON-1058 Test for Feed Replication with Empty Directories(Pragya M via 
Samarth Gupta)
 
    FALCON-1046 Add test for process update with user feature(Karishma G via 
Samarth Gupta)
+   
    FALCON-1017 FeedReplicationTest modified to check for _SUCCESS getting 
created on 
    target directory(Pragya M via Samarth G)   
+   
    FALCON-1040 Modifying ProcessInstanceStatusTest to expose job id for 
running jobs in 
    Falcon. (Pragya M via Samarth G)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/ccdf02e7/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index 3930c8c..f3a8318 100644
--- 
a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ 
b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -47,6 +47,7 @@ import org.joda.time.format.DateTimeFormatter;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import javax.xml.bind.JAXBException;
@@ -56,6 +57,7 @@ import java.util.List;
 
 /**
  * feed replication test.
+ * Replicates empty directories as well as directories containing data.
  */
 @Test(groups = "embedded")
 public class FeedReplicationTest extends BaseTestClass {
@@ -89,8 +91,9 @@ public class FeedReplicationTest extends BaseTestClass {
     }
 
     @AfterMethod(alwaysRun = true)
-    public void tearDown() {
+    public void tearDown() throws IOException {
         removeTestClassEntities();
+        cleanTestsDirs();
     }
 
     /**
@@ -99,10 +102,10 @@ public class FeedReplicationTest extends BaseTestClass {
      * replication ends test checks if data was replicated correctly.
      * Also checks for presence of _SUCCESS file in target directory.
      */
-    @Test
-    public void replicate1Source1Target()
+    @Test(dataProvider = "dataFlagProvider")
+    public void replicate1Source1Target(boolean dataFlag)
         throws AuthenticationException, IOException, URISyntaxException, 
JAXBException,
-        OozieClientException, InterruptedException {
+            OozieClientException, InterruptedException {
         Bundle.submitCluster(bundles[0], bundles[1]);
         String startTime = TimeUtil.getTimeWrtSystemTime(0);
         String endTime = TimeUtil.addMinsToTime(startTime, 5);
@@ -115,19 +118,19 @@ public class FeedReplicationTest extends BaseTestClass {
         feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
         //set cluster1 as source
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.SOURCE)
+                        .build()).toString();
         //set cluster2 as target
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build()).toString();
 
         //submit and schedule feed
         LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
@@ -143,26 +146,28 @@ public class FeedReplicationTest extends BaseTestClass {
 
         Path toSource = new Path(sourceLocation);
         Path toTarget = new Path(targetLocation);
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-            OSUtil.RESOURCES + "feed-s4Replication.xml");
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+        if (dataFlag) {
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+                    OSUtil.RESOURCES + "feed-s4Replication.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+        }
 
         //check if coordinator exists
         InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
 
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), 
Util.readEntityName(feed),
-                "REPLICATION"), 1);
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), 
Util.readEntityName(feed),
+                        "REPLICATION"), 1);
 
         //replication should start, wait while it ends
         InstanceUtil.waitTillInstanceReachState(cluster2OC, 
Util.readEntityName(feed), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
         List<Path> cluster2ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
 
         AssertUtil.checkForListSizes(cluster1ReplicatedData, 
cluster2ReplicatedData);
 
@@ -179,8 +184,8 @@ public class FeedReplicationTest extends BaseTestClass {
      * targets. When replication ends test checks if data was replicated 
correctly.
      * Also checks for presence of _SUCCESS file in target directory.
      */
-    @Test
-    public void replicate1Source2Targets() throws Exception {
+    @Test(dataProvider = "dataFlagProvider")
+    public void replicate1Source2Targets(boolean dataFlag) throws Exception {
         Bundle.submitCluster(bundles[0], bundles[1], bundles[2]);
         String startTime = TimeUtil.getTimeWrtSystemTime(0);
         String endTime = TimeUtil.addMinsToTime(startTime, 5);
@@ -193,27 +198,27 @@ public class FeedReplicationTest extends BaseTestClass {
         feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
         //set cluster1 as source
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.SOURCE)
+                        .build()).toString();
         //set cluster2 as target
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build()).toString();
         //set cluster3 as target
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build()).toString();
 
         //submit and schedule feed
         LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
@@ -229,9 +234,12 @@ public class FeedReplicationTest extends BaseTestClass {
 
         Path toSource = new Path(sourceLocation);
         Path toTarget = new Path(targetLocation);
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-            OSUtil.RESOURCES + "feed-s4Replication.xml");
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+
+        if (dataFlag) {
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+                    OSUtil.RESOURCES + "feed-s4Replication.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+        }
 
         //check if all coordinators exist
         InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
@@ -239,26 +247,26 @@ public class FeedReplicationTest extends BaseTestClass {
         InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0);
 
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), 
Util.readEntityName(feed),
-                "REPLICATION"), 1);
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), 
Util.readEntityName(feed),
+                        "REPLICATION"), 1);
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster3.getFeedHelper(), 
Util.readEntityName(feed),
-                "REPLICATION"), 1);
+                .checkIfFeedCoordExist(cluster3.getFeedHelper(), 
Util.readEntityName(feed),
+                        "REPLICATION"), 1);
         //replication on cluster 2 should start, wait till it ends
         InstanceUtil.waitTillInstanceReachState(cluster2OC, 
Util.readEntityName(feed), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //replication on cluster 3 should start, wait till it ends
         InstanceUtil.waitTillInstanceReachState(cluster3OC, 
Util.readEntityName(feed), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data has been replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
         List<Path> cluster2ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
         List<Path> cluster3ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster3FS, toTarget);
+                .getAllFilesRecursivelyHDFS(cluster3FS, toTarget);
 
         AssertUtil.checkForListSizes(cluster1ReplicatedData, 
cluster2ReplicatedData);
         AssertUtil.checkForListSizes(cluster1ReplicatedData, 
cluster3ReplicatedData);
@@ -279,8 +287,8 @@ public class FeedReplicationTest extends BaseTestClass {
      * replication starts and when it ends test checks if data was replicated 
correctly.
      * Also checks for presence of availability flag in target directory.
      */
-    @Test
-    public void availabilityFlagTest() throws Exception {
+    @Test(dataProvider = "dataFlagProvider")
+    public void availabilityFlagTest(boolean dataFlag) throws Exception {
         //replicate1Source1Target scenario + set availability flag but don't 
upload required file
         Bundle.submitCluster(bundles[0], bundles[1]);
         String startTime = TimeUtil.getTimeWrtSystemTime(0);
@@ -299,19 +307,19 @@ public class FeedReplicationTest extends BaseTestClass {
         feed = FeedMerlin.fromString(feed).clearFeedClusters().toString();
         //set cluster1 as source
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.SOURCE)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.SOURCE)
+                        .build()).toString();
         //set cluster2 as target
         feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("days(1000000)", ActionType.DELETE)
-                .withValidity(startTime, endTime)
-                .withClusterType(ClusterType.TARGET)
-                .withDataLocation(targetDataLocation)
-                .build()).toString();
+                new 
FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
+                        .withRetention("days(1000000)", ActionType.DELETE)
+                        .withValidity(startTime, endTime)
+                        .withClusterType(ClusterType.TARGET)
+                        .withDataLocation(targetDataLocation)
+                        .build()).toString();
 
         //submit and schedule feed
         LOGGER.info("Feed : " + Util.prettyPrintXml(feed));
@@ -327,42 +335,44 @@ public class FeedReplicationTest extends BaseTestClass {
 
         Path toSource = new Path(sourceLocation);
         Path toTarget = new Path(targetLocation);
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-            OSUtil.RESOURCES + "feed-s4Replication.xml");
-        HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+        if (dataFlag) {
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
+                    OSUtil.RESOURCES + "feed-s4Replication.xml");
+            HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation, 
OSUtil.RESOURCES + "log_01.txt");
+        }
 
         //check while instance is got created
         InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
 
         //check if coordinator exists
         Assert.assertEquals(InstanceUtil
-            .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, 
"REPLICATION"), 1);
+                .checkIfFeedCoordExist(cluster2.getFeedHelper(), feedName, 
"REPLICATION"), 1);
 
         //replication should not start even after time
         TimeUtil.sleepSeconds(60);
         InstancesResult r = 
prism.getFeedHelper().getProcessInstanceStatus(feedName,
-            "?start=" + startTime + "&end=" + endTime);
+                "?start=" + startTime + "&end=" + endTime);
         InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0);
         LOGGER.info("Replication didn't start.");
 
         //create availability flag on source
         HadoopUtil.copyDataToFolder(cluster1FS, sourceLocation,
-            OSUtil.RESOURCES + availabilityFlagName);
+                OSUtil.RESOURCES + availabilityFlagName);
 
         //check if instance become running
         InstanceUtil.waitTillInstanceReachState(cluster2OC, 
Util.readEntityName(feed), 1,
-            CoordinatorAction.Status.RUNNING, EntityType.FEED);
+                CoordinatorAction.Status.RUNNING, EntityType.FEED);
 
         //wait till instance succeed
         InstanceUtil.waitTillInstanceReachState(cluster2OC, 
Util.readEntityName(feed), 1,
-            CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
+                CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
 
         //check if data was replicated correctly
         List<Path> cluster1ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
+                .getAllFilesRecursivelyHDFS(cluster1FS, toSource);
         LOGGER.info("Data on source cluster: " + cluster1ReplicatedData);
         List<Path> cluster2ReplicatedData = HadoopUtil
-            .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
+                .getAllFilesRecursivelyHDFS(cluster2FS, toTarget);
         LOGGER.info("Data on target cluster: " + cluster2ReplicatedData);
         AssertUtil.checkForListSizes(cluster1ReplicatedData, 
cluster2ReplicatedData);
 
@@ -372,4 +382,17 @@ public class FeedReplicationTest extends BaseTestClass {
         //availabilityFlag should exist in target
         Assert.assertEquals(HadoopUtil.getSuccessFolder(cluster2FS, toTarget, 
availabilityFlagName), true);
     }
+
+
+    /* Flag value denotes whether to add data for replication or not.
+     * flag=true : add data for replication.
+     * flag=false : let empty directories be replicated.
+     */
+    @DataProvider(name = "dataFlagProvider")
+    private Object[][] dataFlagProvider() {
+        return new Object[][] {
+            new Object[] {true, },
+            new Object[] {false, },
+        };
+    }
 }

Reply via email to