Repository: crunch Updated Branches: refs/heads/master 16209bed6 -> a670b9169
CRUNCH-543: Have AvroPathPerKeyTarget handle child directories properly Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a670b916 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a670b916 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a670b916 Branch: refs/heads/master Commit: a670b91697bcb89faff252dd4204d8273b6dbf2d Parents: 16209be Author: Josh Wills <[email protected]> Authored: Thu Jul 16 16:07:38 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jul 16 16:07:38 2015 -0700 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroPathPerKeyIT.java | 39 ++++++++++++++++++++ .../crunch/io/avro/AvroPathPerKeyTarget.java | 29 +++++++++++---- 2 files changed, 61 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a670b916/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java index c1f7fa6..e674229 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java @@ -93,4 +93,43 @@ public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable assertFalse(fs.exists(outDir)); } + @Test + public void testOutputFilePerKey_Directories() throws Exception { + Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration()); + Path outDir = tempDir.getPath("out"); + p.read(From.textFile(tempDir.copyResourceFileName("docs.txt"))) + .parallelDo(new MapFn<String, Pair<String, String>>() { + @Override + public Pair<String, String> map(String input) { + String[] p = input.split("\t"); + return Pair.of(p[0] + "/child", p[1]); + } + }, Avros.tableOf(Avros.strings(), Avros.strings())) + .groupByKey() + .write(new AvroPathPerKeyTarget(outDir)); + p.done(); + + Set<String> names = Sets.newHashSet(); + FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration()); + for (FileStatus fstat : fs.listStatus(outDir)) { + names.add(fstat.getPath().getName()); + } + assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names); + + Path aParent = new Path(outDir, "A"); + FileStatus[] aParentStat = fs.listStatus(aParent); + assertEquals(1, aParentStat.length); + assertEquals("child", aParentStat[0].getPath().getName()); + FileStatus[] aChildStat = fs.listStatus(new Path(aParent, "child")); + assertEquals(1, aChildStat.length); + assertEquals("part-r-00000.avro", aChildStat[0].getPath().getName()); + + Path bParent = new Path(outDir, "B"); + FileStatus[] bParentStat = fs.listStatus(bParent); + assertEquals(1, bParentStat.length); + assertEquals("child", bParentStat[0].getPath().getName()); + FileStatus[] bChildStat = fs.listStatus(new Path(bParent, "child")); + assertEquals(1, bChildStat.length); + assertEquals("part-r-00000.avro", bChildStat[0].getPath().getName()); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/a670b916/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java index 336b940..d17e5d7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java @@ -91,26 +91,41 @@ public class AvroPathPerKeyTarget extends FileTargetImpl { LOG.warn("Nothing to copy from {}", base); return; } - Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base)); FileSystem dstFs = path.getFileSystem(conf); + Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base)); if (!dstFs.exists(path)) { dstFs.mkdirs(path); } boolean sameFs = isCompatible(srcFs, path); + move(conf, base, srcFs, path, dstFs, sameFs); + dstFs.create(getSuccessIndicator(), true).close(); + } + + private void move(Configuration conf, Path srcBase, FileSystem srcFs, Path dstBase, FileSystem dstFs, boolean sameFs) + throws IOException { + Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(srcBase)); + if (!dstFs.exists(dstBase)) { + dstFs.mkdirs(dstBase); + } for (Path key : keys) { Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key); - Path targetPath = new Path(path, key.getName()); + Path targetPath = new Path(dstBase, key.getName()); dstFs.mkdirs(targetPath); for (Path s : srcs) { - Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-")); - if (sameFs) { - srcFs.rename(s, d); + if (srcFs.isDirectory(s)) { + Path nextBase = new Path(targetPath, s.getName()); + dstFs.mkdirs(nextBase); + move(conf, s, srcFs, nextBase, dstFs, sameFs); } else { - FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); + Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-")); + if (sameFs) { + srcFs.rename(s, d); + } else { + FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); + } } } } - dstFs.create(getSuccessIndicator(), true).close(); } @Override
