Repository: crunch Updated Branches: refs/heads/master d4917a217 -> ada7e3a64
CRUNCH-371 Empty output in AvroPathPerKeyTarget Correction handle an empty output in AvroPathPerKeyTarget, i.e. do not attempt to copy/create any output paths. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ada7e3a6 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ada7e3a6 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ada7e3a6 Branch: refs/heads/master Commit: ada7e3a6470447dc0b90884a3d59da50b52c5315 Parents: d4917a2 Author: Gabriel Reid <[email protected]> Authored: Sat Mar 29 13:06:52 2014 +0100 Committer: Gabriel Reid <[email protected]> Committed: Mon Mar 31 18:34:53 2014 +0200 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroPathPerKeyIT.java | 25 ++++++++++++++++++++ .../crunch/io/avro/AvroPathPerKeyTarget.java | 12 ++++++++-- 2 files changed, 35 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java index 7b30a60..c1f7fa6 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.fn.FilterFns; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; import org.apache.crunch.test.CrunchTestSupport; @@ -35,6 +36,7 @@ import java.io.Serializable; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable { @Test @@ -68,4 +70,27 @@ public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable assertEquals(1, bStat.length); assertEquals("part-r-00000.avro", bStat[0].getPath().getName()); } + + @Test + public void testOutputFilePerKey_NothingToOutput() throws Exception { + Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration()); + Path outDir = tempDir.getPath("out"); + + p.read(From.textFile(tempDir.copyResourceFileName("docs.txt"))) + .parallelDo(new MapFn<String, Pair<String, String>>() { + @Override + public Pair<String, String> map(String input) { + String[] p = input.split("\t"); + return Pair.of(p[0], p[1]); + } + }, Avros.tableOf(Avros.strings(), Avros.strings())) + .filter(FilterFns.<Pair<String, String>>REJECT_ALL()) + .groupByKey() + .write(new AvroPathPerKeyTarget(outDir)); + p.done(); + + FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration()); + assertFalse(fs.exists(outDir)); + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java index 6befbad..5c47b8a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java @@ -18,6 +18,8 @@ package org.apache.crunch.io.avro; import org.apache.avro.mapred.AvroWrapper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.FormatBundle; @@ -26,8 +28,8 @@ import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; -import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat; import org.apache.crunch.types.avro.AvroMode; +import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat; import org.apache.crunch.types.avro.AvroType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -47,6 +49,8 @@ import java.io.IOException; */ public class AvroPathPerKeyTarget extends FileTargetImpl { + private static final Log LOG = LogFactory.getLog(AvroPathPerKeyTarget.class); + public AvroPathPerKeyTarget(String path) { this(new Path(path)); } @@ -89,7 +93,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl { public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { FileSystem srcFs = workingPath.getFileSystem(conf); Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index); - Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base), base); + if (!srcFs.exists(base)) { + LOG.warn("Nothing to copy from " + base); + return; + } + Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base)); FileSystem dstFs = path.getFileSystem(conf); if (!dstFs.exists(path)) { dstFs.mkdirs(path);
