Repository: falcon Updated Branches: refs/heads/master d0bc18860 -> b135f28f3
FALCON-2049 Feed Replication with Empty Directories are failing Author: bvellanki <[email protected]> Reviewers: "Venkat Ranganathan <[email protected]>, Ying Zheng <[email protected]>, Peeyush B <[email protected]>, Pallavi Rao <[email protected]>" Closes #204 from bvellanki/FALCON-2049 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/b135f28f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/b135f28f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/b135f28f Branch: refs/heads/master Commit: b135f28f314b1ec0a89c2e1c21b033e48e7db602 Parents: d0bc188 Author: bvellanki <[email protected]> Authored: Fri Jul 1 09:42:31 2016 -0700 Committer: bvellanki <[email protected]> Committed: Fri Jul 1 09:42:31 2016 -0700 ---------------------------------------------------------------------- replication/pom.xml | 5 +++++ .../falcon/replication/FeedReplicator.java | 19 ++++++++++++----- .../falcon/replication/FeedReplicatorTest.java | 22 +++++++++++++------- 3 files changed, 34 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/b135f28f/replication/pom.xml ---------------------------------------------------------------------- diff --git a/replication/pom.xml b/replication/pom.xml index ff66e52..3f47226 100644 --- a/replication/pom.xml +++ b/replication/pom.xml @@ -52,6 +52,11 @@ <groupId>org.apache.falcon</groupId> <artifactId>falcon-metrics</artifactId> </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-test-util</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/falcon/blob/b135f28f/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index a8da51d..0906bd5 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -184,12 +184,13 @@ public class FeedReplicator extends Configured implements Tool { return new GnuParser().parse(options, args); } - protected DistCpOptions getDistCpOptions(CommandLine cmd) { + protected DistCpOptions getDistCpOptions(CommandLine cmd) throws FalconException, IOException { String[] paths = cmd.getOptionValue("sourcePaths").trim().split(","); List<Path> srcPaths = getPaths(paths); - String trgPath = cmd.getOptionValue("targetPath").trim(); + String targetPathString = cmd.getOptionValue("targetPath").trim(); + Path targetPath = new Path(targetPathString); - DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(trgPath)); + DistCpOptions distcpOptions = new DistCpOptions(srcPaths, targetPath); distcpOptions.setBlocking(true); distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps"))); distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth"))); @@ -214,8 +215,16 @@ public class FeedReplicator extends Configured implements Tool { // Removing deleted files by default - FALCON-1844 String removeDeletedFiles = cmd.getOptionValue( ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true"); - distcpOptions.setDeleteMissing(Boolean.parseBoolean(removeDeletedFiles)); - + boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles); + distcpOptions.setDeleteMissing(deleteMissing); + if (deleteMissing) { + // DistCP will fail with InvalidInputException if deleteMissing is set to true and + // if targetPath does not exist. Create targetPath to avoid failures. + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), getConf()); + if (!fs.exists(targetPath)) { + fs.mkdirs(targetPath); + } + } String preserveBlockSize = cmd.getOptionValue( ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/b135f28f/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java ---------------------------------------------------------------------- diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java index e7e177e..2662ade 100644 --- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java +++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java @@ -17,6 +17,7 @@ */ package org.apache.falcon.replication; +import org.apache.falcon.cluster.util.EmbeddedCluster; import org.apache.commons.cli.CommandLine; import org.apache.falcon.entity.Storage; import org.apache.hadoop.fs.Path; @@ -32,6 +33,8 @@ import java.util.List; */ public class FeedReplicatorTest { + private String defaultPath = "jail://FeedReplicatorTest:00/tmp"; + @Test public void testArguments() throws Exception { /* @@ -42,21 +45,26 @@ public class FeedReplicatorTest { * <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg> * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg> */ + + // creates jailed cluster in which DistCpOtions command can be tested. + EmbeddedCluster cluster = EmbeddedCluster.newCluster("FeedReplicatorTest"); + final String[] args = { "true", "-maxMaps", "3", "-mapBandwidth", "4", - "-sourcePaths", "hdfs://localhost:8020/tmp/", - "-targetPath", "hdfs://localhost1:8020/tmp/", + "-sourcePaths", defaultPath, + "-targetPath", defaultPath, "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(), }; FeedReplicator replicator = new FeedReplicator(); CommandLine cmd = replicator.getCommand(args); + replicator.setConf(cluster.getConf()); DistCpOptions options = replicator.getDistCpOptions(cmd); List<Path> srcPaths = new ArrayList<Path>(); - srcPaths.add(new Path("hdfs://localhost:8020/tmp/")); + srcPaths.add(new Path(defaultPath)); validateMandatoryArguments(options, srcPaths, true); Assert.assertTrue(options.shouldDeleteMissing()); } @@ -82,8 +90,8 @@ public class FeedReplicatorTest { "true", "-maxMaps", "3", "-mapBandwidth", "4", - "-sourcePaths", "hdfs://localhost:8020/tmp/", - "-targetPath", "hdfs://localhost1:8020/tmp/", + "-sourcePaths", defaultPath, + "-targetPath", defaultPath, "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(), "-overwrite", "true", "-ignoreErrors", "false", @@ -99,7 +107,7 @@ public class FeedReplicatorTest { DistCpOptions options = replicator.getDistCpOptions(cmd); List<Path> srcPaths = new ArrayList<Path>(); - srcPaths.add(new Path("hdfs://localhost:8020/tmp/")); + srcPaths.add(new Path(defaultPath)); validateMandatoryArguments(options, srcPaths, false); validateOptionalArguments(options); } @@ -108,7 +116,7 @@ public class FeedReplicatorTest { Assert.assertEquals(options.getMaxMaps(), 3); Assert.assertEquals(options.getMapBandwidth(), 4); Assert.assertEquals(options.getSourcePaths(), srcPaths); - Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost1:8020/tmp/")); + Assert.assertEquals(options.getTargetPath(), new Path(defaultPath)); Assert.assertEquals(options.shouldSyncFolder(), shouldSyncFolder); }
