Repository: falcon Updated Branches: refs/heads/master aa522a548 -> 772e38779
FALCON-2171 When feed from multiple colos are replicated, the colo folders get overwritten Made changes to use the include path to set source and target of Distcp (not just the base directory of feed). UT added. Manual testing done with single colo. Test on distributed setup pending. Author: Pallavi Rao <[email protected]> Reviewers: @sandeepSamudrala Closes #290 from pallavi-rao/2171 and squashes the following commits: a2671b1 [Pallavi Rao] Revert "FALCON-1821 Update git pull merge script to accept and update JIRA type" 5228a40 [Pallavi Rao] FALCON-2171 When feed from multiple colos are replicated, the colo folders get overwritten a6d8c6c [Pallavi Rao] FALCON-1821 Update git pull merge script to accept and update JIRA type Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/772e3877 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/772e3877 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/772e3877 Branch: refs/heads/master Commit: 772e38779ec1e8be4fd4aa067247299ca584bdc5 Parents: aa522a5 Author: Pallavi Rao <[email protected]> Authored: Tue Oct 25 16:41:36 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Oct 25 16:41:36 2016 +0530 ---------------------------------------------------------------------- .../falcon/replication/FeedReplicator.java | 55 +++++++++++--------- .../falcon/replication/FeedReplicatorTest.java | 38 +++++++++++--- 2 files changed, 62 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/772e3877/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index 9c2c522..39b79e0 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.replication; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Option; @@ -67,7 +68,6 @@ public class FeedReplicator extends Configured implements Tool { @Override public int run(String[] args) throws Exception { CommandLine cmd = getCommand(args); - DistCpOptions options = getDistCpOptions(cmd); Configuration conf = this.getConf(); // inject wf configs @@ -81,6 +81,8 @@ public class FeedReplicator extends Configured implements Tool { final boolean includePathSet = (includePathConf != null) && !IGNORE.equalsIgnoreCase(includePathConf); + DistCpOptions options = getDistCpOptions(cmd, includePathSet); + String availabilityFlagOpt = cmd.getOptionValue("availabilityFlag"); if (StringUtils.isEmpty(availabilityFlagOpt)) { availabilityFlagOpt = "NA"; @@ -95,7 +97,7 @@ public class FeedReplicator extends Configured implements Tool { DistCp distCp = (includePathSet) ? new CustomReplicator(conf, options) : new DistCp(conf, options); - LOG.info("Started DistCp"); + LOG.info("Started DistCp with options :" + options); Job job = distCp.execute(); if (cmd.hasOption("counterLogDir") @@ -220,12 +222,32 @@ public class FeedReplicator extends Configured implements Tool { return new GnuParser().parse(options, args); } - protected DistCpOptions getDistCpOptions(CommandLine cmd) throws FalconException, IOException { + protected DistCpOptions getDistCpOptions(CommandLine cmd, boolean includePathSet) + throws FalconException, IOException { String[] paths = cmd.getOptionValue("sourcePaths").trim().split(","); List<Path> srcPaths = getPaths(paths); String targetPathString = cmd.getOptionValue("targetPath").trim(); Path targetPath = new Path(targetPathString); + if (includePathSet) { + assert srcPaths.size() == 1 : "Source paths more than 1 can't be handled"; + + Path sourcePath = srcPaths.get(0); + Path includePath = new Path(getConf().get("falcon.include.path")); + assert includePath.toString().substring(0, sourcePath.toString().length()). + equals(sourcePath.toString()) : "Source path is not a subset of include path"; + + String relativePath = includePath.toString().substring(sourcePath.toString().length()); + String fixedPath = getFixedPath(relativePath); + + fixedPath = StringUtils.stripStart(fixedPath, "/"); + if (StringUtils.isNotEmpty(fixedPath)) { + sourcePath = new Path(sourcePath, fixedPath); + srcPaths = Arrays.asList(new Path[]{sourcePath}); + targetPath = new Path(targetPath, fixedPath); + } + } + return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, false, getConf()); } @@ -237,31 +259,14 @@ public class FeedReplicator extends Configured implements Tool { return listPaths; } - private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException { + private void executePostProcessing(Configuration conf, DistCpOptions options) + throws IOException, FalconException { Path targetPath = options.getTargetPath(); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( targetPath.toUri(), getConf()); - List<Path> inPaths = options.getSourcePaths(); - assert inPaths.size() == 1 : "Source paths more than 1 can't be handled"; - - Path sourcePath = inPaths.get(0); - Path includePath = new Path(getConf().get("falcon.include.path")); - assert includePath.toString().substring(0, sourcePath.toString().length()). - equals(sourcePath.toString()) : "Source path is not a subset of include path"; - - String relativePath = includePath.toString().substring(sourcePath.toString().length()); - String fixedPath = getFixedPath(relativePath); - - fixedPath = StringUtils.stripStart(fixedPath, "/"); - Path finalOutputPath; - if (StringUtils.isNotEmpty(fixedPath)) { - finalOutputPath = new Path(targetPath, fixedPath); - } else { - finalOutputPath = targetPath; - } final String availabilityFlag = conf.get("falcon.feed.availability.flag"); - FileStatus[] files = fs.globStatus(finalOutputPath); + FileStatus[] files = fs.globStatus(targetPath); if (files != null) { for (FileStatus file : files) { fs.create(new Path(file.getPath(), availabilityFlag)).close(); @@ -269,8 +274,8 @@ public class FeedReplicator extends Configured implements Tool { } } else { // As distcp is not copying empty directories we are creating availabilityFlag file here - fs.create(new Path(finalOutputPath, availabilityFlag)).close(); - LOG.info("No files present in path: {}", finalOutputPath); + fs.create(new Path(targetPath, availabilityFlag)).close(); + LOG.info("No files present in path: {}", targetPath); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/772e3877/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java ---------------------------------------------------------------------- diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java index b9b383d..4219767 100644 --- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java +++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java @@ -17,17 +17,17 @@ */ package org.apache.falcon.replication; -import org.apache.falcon.cluster.util.EmbeddedCluster; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.cli.CommandLine; +import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.falcon.entity.Storage; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.DistCpOptions; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.List; - /** * Test class for FeedReplicator. */ @@ -61,7 +61,7 @@ public class FeedReplicatorTest { FeedReplicator replicator = new FeedReplicator(); CommandLine cmd = replicator.getCommand(args); replicator.setConf(cluster.getConf()); - DistCpOptions options = replicator.getDistCpOptions(cmd); + DistCpOptions options = replicator.getDistCpOptions(cmd, false); List<Path> srcPaths = new ArrayList<Path>(); srcPaths.add(new Path(defaultPath)); @@ -116,7 +116,7 @@ public class FeedReplicatorTest { FeedReplicator replicator = new FeedReplicator(); CommandLine cmd = replicator.getCommand(optionalArgs); - DistCpOptions options = replicator.getDistCpOptions(cmd); + DistCpOptions options = replicator.getDistCpOptions(cmd, false); List<Path> srcPaths = new ArrayList<Path>(); srcPaths.add(new Path(defaultPath)); @@ -124,6 +124,32 @@ public class FeedReplicatorTest { validateOptionalArguments(options); } + @Test + public void testIncludePath() throws Exception { + // Set the include Path so that CustomReplicator is used and the source and targetPaths are modified. + String includePath = defaultPath + "/test-colo"; + // creates jailed cluster in which DistCpOtions command can be tested. + EmbeddedCluster cluster = EmbeddedCluster.newCluster("FeedReplicatorTest"); + + final String[] args = { + "true", + "-maxMaps", "3", + "-mapBandwidth", "4", + "-sourcePaths", defaultPath, + "-targetPath", defaultPath, + "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(), + }; + + FeedReplicator replicator = new FeedReplicator(); + CommandLine cmd = replicator.getCommand(args); + Configuration conf = cluster.getConf(); + conf.set("falcon.include.path", includePath); + replicator.setConf(conf); + DistCpOptions options = replicator.getDistCpOptions(cmd, true); + Assert.assertEquals(options.getTargetPath().toString(), includePath); + Assert.assertEquals(options.getSourcePaths().get(0).toString(), includePath); + } + private void validateMandatoryArguments(DistCpOptions options, List<Path> srcPaths, boolean shouldSyncFolder) { Assert.assertEquals(options.getMaxMaps(), 3); Assert.assertEquals(options.getMapBandwidth(), 4);
