Repository: falcon
Updated Branches:
  refs/heads/master aa522a548 -> 772e38779


FALCON-2171 When feed from multiple colos are replicated, the colo folders get 
overwritten

Made changes to use the include path to set source and target of Distcp (not 
just the base directory of feed).

UT added. Manual testing done with single colo. Test on distributed setup 
pending.

Author: Pallavi Rao <[email protected]>

Reviewers: @sandeepSamudrala

Closes #290 from pallavi-rao/2171 and squashes the following commits:

a2671b1 [Pallavi Rao] Revert "FALCON-1821 Update git pull merge script to 
accept and update JIRA type"
5228a40 [Pallavi Rao] FALCON-2171 When feed from multiple colos are replicated, 
the colo folders get overwritten
a6d8c6c [Pallavi Rao] FALCON-1821 Update git pull merge script to accept and 
update JIRA type


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/772e3877
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/772e3877
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/772e3877

Branch: refs/heads/master
Commit: 772e38779ec1e8be4fd4aa067247299ca584bdc5
Parents: aa522a5
Author: Pallavi Rao <[email protected]>
Authored: Tue Oct 25 16:41:36 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Tue Oct 25 16:41:36 2016 +0530

----------------------------------------------------------------------
 .../falcon/replication/FeedReplicator.java      | 55 +++++++++++---------
 .../falcon/replication/FeedReplicatorTest.java  | 38 +++++++++++---
 2 files changed, 62 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/772e3877/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 9c2c522..39b79e0 100644
--- 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.replication;
 
+import java.util.Arrays;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Option;
@@ -67,7 +68,6 @@ public class FeedReplicator extends Configured implements 
Tool {
     @Override
     public int run(String[] args) throws Exception {
         CommandLine cmd = getCommand(args);
-        DistCpOptions options = getDistCpOptions(cmd);
 
         Configuration conf = this.getConf();
         // inject wf configs
@@ -81,6 +81,8 @@ public class FeedReplicator extends Configured implements 
Tool {
         final boolean includePathSet = (includePathConf != null)
                 && !IGNORE.equalsIgnoreCase(includePathConf);
 
+        DistCpOptions options = getDistCpOptions(cmd, includePathSet);
+
         String availabilityFlagOpt = cmd.getOptionValue("availabilityFlag");
         if (StringUtils.isEmpty(availabilityFlagOpt)) {
             availabilityFlagOpt = "NA";
@@ -95,7 +97,7 @@ public class FeedReplicator extends Configured implements 
Tool {
         DistCp distCp = (includePathSet)
                 ? new CustomReplicator(conf, options)
                 : new DistCp(conf, options);
-        LOG.info("Started DistCp");
+        LOG.info("Started DistCp with options :" + options);
         Job job = distCp.execute();
 
         if (cmd.hasOption("counterLogDir")
@@ -220,12 +222,32 @@ public class FeedReplicator extends Configured implements 
Tool {
         return new GnuParser().parse(options, args);
     }
 
-    protected DistCpOptions getDistCpOptions(CommandLine cmd) throws 
FalconException, IOException {
+    protected DistCpOptions getDistCpOptions(CommandLine cmd, boolean 
includePathSet)
+        throws FalconException, IOException {
         String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
         List<Path> srcPaths = getPaths(paths);
         String targetPathString = cmd.getOptionValue("targetPath").trim();
         Path targetPath = new Path(targetPathString);
 
+        if (includePathSet) {
+            assert srcPaths.size() == 1 : "Source paths more than 1 can't be 
handled";
+
+            Path sourcePath = srcPaths.get(0);
+            Path includePath = new Path(getConf().get("falcon.include.path"));
+            assert includePath.toString().substring(0, 
sourcePath.toString().length()).
+                    equals(sourcePath.toString()) : "Source path is not a 
subset of include path";
+
+            String relativePath = 
includePath.toString().substring(sourcePath.toString().length());
+            String fixedPath = getFixedPath(relativePath);
+
+            fixedPath = StringUtils.stripStart(fixedPath, "/");
+            if (StringUtils.isNotEmpty(fixedPath)) {
+                sourcePath = new Path(sourcePath, fixedPath);
+                srcPaths = Arrays.asList(new Path[]{sourcePath});
+                targetPath = new Path(targetPath, fixedPath);
+            }
+        }
+
         return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, 
false, getConf());
     }
 
@@ -237,31 +259,14 @@ public class FeedReplicator extends Configured implements 
Tool {
         return listPaths;
     }
 
-    private void executePostProcessing(Configuration conf, DistCpOptions 
options) throws IOException, FalconException {
+    private void executePostProcessing(Configuration conf, DistCpOptions 
options)
+        throws IOException, FalconException {
         Path targetPath = options.getTargetPath();
         FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
                 targetPath.toUri(), getConf());
-        List<Path> inPaths = options.getSourcePaths();
-        assert inPaths.size() == 1 : "Source paths more than 1 can't be 
handled";
-
-        Path sourcePath = inPaths.get(0);
-        Path includePath = new Path(getConf().get("falcon.include.path"));
-        assert includePath.toString().substring(0, 
sourcePath.toString().length()).
-                equals(sourcePath.toString()) : "Source path is not a subset 
of include path";
-
-        String relativePath = 
includePath.toString().substring(sourcePath.toString().length());
-        String fixedPath = getFixedPath(relativePath);
-
-        fixedPath = StringUtils.stripStart(fixedPath, "/");
-        Path finalOutputPath;
-        if (StringUtils.isNotEmpty(fixedPath)) {
-            finalOutputPath = new Path(targetPath, fixedPath);
-        } else {
-            finalOutputPath = targetPath;
-        }
 
         final String availabilityFlag = 
conf.get("falcon.feed.availability.flag");
-        FileStatus[] files = fs.globStatus(finalOutputPath);
+        FileStatus[] files = fs.globStatus(targetPath);
         if (files != null) {
             for (FileStatus file : files) {
                 fs.create(new Path(file.getPath(), availabilityFlag)).close();
@@ -269,8 +274,8 @@ public class FeedReplicator extends Configured implements 
Tool {
             }
         } else {
             // As distcp is not copying empty directories we are creating 
availabilityFlag file here
-            fs.create(new Path(finalOutputPath, availabilityFlag)).close();
-            LOG.info("No files present in path: {}", finalOutputPath);
+            fs.create(new Path(targetPath, availabilityFlag)).close();
+            LOG.info("No files present in path: {}", targetPath);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/772e3877/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index b9b383d..4219767 100644
--- 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -17,17 +17,17 @@
  */
 package org.apache.falcon.replication;
 
-import org.apache.falcon.cluster.util.EmbeddedCluster;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.cli.CommandLine;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.Storage;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.DistCpOptions;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Test class for FeedReplicator.
  */
@@ -61,7 +61,7 @@ public class FeedReplicatorTest {
         FeedReplicator replicator = new FeedReplicator();
         CommandLine cmd = replicator.getCommand(args);
         replicator.setConf(cluster.getConf());
-        DistCpOptions options = replicator.getDistCpOptions(cmd);
+        DistCpOptions options = replicator.getDistCpOptions(cmd, false);
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path(defaultPath));
@@ -116,7 +116,7 @@ public class FeedReplicatorTest {
 
         FeedReplicator replicator = new FeedReplicator();
         CommandLine cmd = replicator.getCommand(optionalArgs);
-        DistCpOptions options = replicator.getDistCpOptions(cmd);
+        DistCpOptions options = replicator.getDistCpOptions(cmd, false);
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path(defaultPath));
@@ -124,6 +124,32 @@ public class FeedReplicatorTest {
         validateOptionalArguments(options);
     }
 
+    @Test
+    public void testIncludePath() throws Exception {
+        // Set the include Path so that CustomReplicator is used and the 
source and targetPaths are modified.
+        String includePath = defaultPath + "/test-colo";
+        // creates jailed cluster in which DistCpOtions command can be tested.
+        EmbeddedCluster cluster = 
EmbeddedCluster.newCluster("FeedReplicatorTest");
+
+        final String[] args = {
+            "true",
+            "-maxMaps", "3",
+            "-mapBandwidth", "4",
+            "-sourcePaths", defaultPath,
+            "-targetPath", defaultPath,
+            "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
+        };
+
+        FeedReplicator replicator = new FeedReplicator();
+        CommandLine cmd = replicator.getCommand(args);
+        Configuration conf = cluster.getConf();
+        conf.set("falcon.include.path", includePath);
+        replicator.setConf(conf);
+        DistCpOptions options = replicator.getDistCpOptions(cmd, true);
+        Assert.assertEquals(options.getTargetPath().toString(), includePath);
+        Assert.assertEquals(options.getSourcePaths().get(0).toString(), 
includePath);
+    }
+
     private void validateMandatoryArguments(DistCpOptions options, List<Path> 
srcPaths, boolean shouldSyncFolder) {
         Assert.assertEquals(options.getMaxMaps(), 3);
         Assert.assertEquals(options.getMapBandwidth(), 4);

Reply via email to